HBASE-19618 Remove replicationQueuesClient.class/replicationQueues.class config and remove table based ReplicationQueuesClient/ReplicationQueues implementation

This commit is contained in:
Guanghao Zhang 2017-12-25 11:44:18 +08:00
parent 5f548146af
commit 65159dc256
8 changed files with 12 additions and 1252 deletions

View File

@ -31,21 +31,16 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@InterfaceAudience.Private
public class ReplicationFactory {
public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
throws Exception {
Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
"replication.replicationQueues.class", defaultReplicationQueueClass);
return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class,
args);
}
public static ReplicationQueuesClient getReplicationQueuesClient(
ReplicationQueuesClientArguments args) throws Exception {
Class<?> classToBuild = args.getConf().getClass(
"hbase.region.replica.replication.replicationQueuesClient.class",
ReplicationQueuesClientZKImpl.class);
return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
public static ReplicationQueuesClient
getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception {
return (ReplicationQueuesClient) ConstructorUtils
.invokeConstructor(ReplicationQueuesClientZKImpl.class, args);
}
public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf,

View File

@ -1,113 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes
* the ReplicationTableBase to access the Replication Table.
*/
@InterfaceAudience.Private
public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
implements ReplicationQueuesClient {
public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args)
throws IOException {
super(args.getConf(), args.getAbortable());
}
public TableBasedReplicationQueuesClientImpl(Configuration conf,
Abortable abortable) throws IOException {
super(conf, abortable);
}
@Override
public void init() throws ReplicationException{
// no-op
}
@Override
public List<String> getListOfReplicators() {
return super.getListOfReplicators();
}
@Override
public List<String> getLogsInQueue(String serverName, String queueId) {
return super.getLogsInQueue(serverName, queueId);
}
@Override
public List<String> getAllQueues(String serverName) {
return super.getAllQueues(serverName);
}
@Override
public Set<String> getAllWALs() {
Set<String> allWals = new HashSet<>();
ResultScanner allQueues = null;
try (Table replicationTable = getOrBlockOnReplicationTable()) {
allQueues = replicationTable.getScanner(new Scan());
for (Result queue : allQueues) {
for (String wal : readWALsFromResult(queue)) {
allWals.add(wal);
}
}
} catch (IOException e) {
String errMsg = "Failed getting all WAL's in Replication Table";
abortable.abort(errMsg, e);
} finally {
if (allQueues != null) {
allQueues.close();
}
}
return allWals;
}
@Override
public int getHFileRefsNodeChangeVersion() throws KeeperException {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
@Override
public List<String> getReplicableHFiles(String peerId) throws KeeperException {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
}

View File

@ -1,448 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* This class provides an implementation of the ReplicationQueues interface using an HBase table
* "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table.
*/
@InterfaceAudience.Private
public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
implements ReplicationQueues {
private static final Logger LOG = LoggerFactory.getLogger(TableBasedReplicationQueuesImpl.class);
// Common byte values used in replication offset tracking
private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
private String serverName = null;
private byte[] serverNameBytes = null;
// TODO: Only use this variable temporarily. Eventually we want to use HBase to store all
// TODO: replication information
private ReplicationStateZKBase replicationState;
public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException {
this(args.getConf(), args.getAbortable(), args.getZk());
}
public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZKWatcher zkw)
throws IOException {
super(conf, abort);
replicationState = new ReplicationStateZKBase(zkw, conf, abort) {};
}
@Override
public void init(String serverName) throws ReplicationException {
this.serverName = serverName;
this.serverNameBytes = Bytes.toBytes(serverName);
}
@Override
public List<String> getListOfReplicators() {
return super.getListOfReplicators();
}
@Override
public void removeQueue(String queueId) {
try {
byte[] rowKey = queueIdToRowKey(queueId);
if (checkQueueExists(queueId)) {
Delete deleteQueue = new Delete(rowKey);
safeQueueUpdate(deleteQueue);
} else {
LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " +
"from the replication table while removing the queue");
}
} catch (IOException | ReplicationException e) {
String errMsg = "Failed removing queue queueId=" + queueId;
abortable.abort(errMsg, e);
}
}
@Override
public void addLog(String queueId, String filename) throws ReplicationException {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
if (!checkQueueExists(queueId)) {
// Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes);
putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES);
putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
replicationTable.put(putNewQueue);
} else {
// Otherwise simply add the new log and offset as a new column
Put putNewLog = new Put(queueIdToRowKey(queueId));
putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
safeQueueUpdate(putNewLog);
}
} catch (IOException | ReplicationException e) {
String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename;
abortable.abort(errMsg, e);
}
}
@Override
public void removeLog(String queueId, String filename) {
try {
byte[] rowKey = queueIdToRowKey(queueId);
Delete delete = new Delete(rowKey);
delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
safeQueueUpdate(delete);
} catch (IOException | ReplicationException e) {
String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename;
abortable.abort(errMsg, e);
}
}
@Override
public void setLogPosition(String queueId, String filename, long position) {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
byte[] rowKey = queueIdToRowKey(queueId);
// Check that the log exists. addLog() must have been called before setLogPosition().
Get checkLogExists = new Get(rowKey);
checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
if (!replicationTable.exists(checkLogExists)) {
String errMsg = "Could not set position of non-existent log from queueId=" + queueId +
", filename=" + filename;
abortable.abort(errMsg, new ReplicationException(errMsg));
return;
}
// Update the log offset if it exists
Put walAndOffset = new Put(rowKey);
walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position));
safeQueueUpdate(walAndOffset);
} catch (IOException | ReplicationException e) {
String errMsg = "Failed writing log position queueId=" + queueId + "filename=" +
filename + " position=" + position;
abortable.abort(errMsg, e);
}
}
@Override
public long getLogPosition(String queueId, String filename) throws ReplicationException {
try {
byte[] rowKey = queueIdToRowKey(queueId);
Get getOffset = new Get(rowKey);
getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
Result result = getResultIfOwner(getOffset);
if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) {
throw new ReplicationException("Could not read empty result while getting log position " +
"queueId=" + queueId + ", filename=" + filename);
}
return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
} catch (IOException e) {
throw new ReplicationException("Could not get position in log for queueId=" + queueId +
", filename=" + filename);
}
}
@Override
public void removeAllQueues() {
List<String> myQueueIds = getAllQueues();
for (String queueId : myQueueIds) {
removeQueue(queueId);
}
}
@Override
public List<String> getLogsInQueue(String queueId) {
String errMsg = "Failed getting logs in queue queueId=" + queueId;
byte[] rowKey = queueIdToRowKey(queueId);
List<String> logs = new ArrayList<>();
try {
Get getQueue = new Get(rowKey);
Result queue = getResultIfOwner(getQueue);
if (queue == null || queue.isEmpty()) {
String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
return null;
}
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
for(byte[] cQualifier : familyMap.keySet()) {
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
COL_QUEUE_OWNER_HISTORY)) {
continue;
}
logs.add(Bytes.toString(cQualifier));
}
} catch (IOException e) {
abortable.abort(errMsg, e);
return null;
}
return logs;
}
@Override
public List<String> getAllQueues() {
return getAllQueues(serverName);
}
@Override public List<String> getUnClaimedQueueIds(String regionserver) {
if (isThisOurRegionServer(regionserver)) {
return null;
}
try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) {
List<String> res = new ArrayList<>();
for (Result queue : queuesToClaim) {
String rowKey = Bytes.toString(queue.getRow());
res.add(rowKey);
}
return res.isEmpty() ? null : res;
} catch (IOException e) {
String errMsg = "Failed getUnClaimedQueueIds";
abortable.abort(errMsg, e);
}
return null;
}
@Override public void removeReplicatorIfQueueIsEmpty(String regionserver) {
// Do nothing here
}
@Override
public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
if (isThisOurRegionServer(regionserver)) {
return null;
}
try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){
for (Result queue : queuesToClaim) {
String rowKey = Bytes.toString(queue.getRow());
if (!rowKey.equals(queueId)){
continue;
}
if (attemptToClaimQueue(queue, regionserver)) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
SortedSet<String> sortedLogs = new TreeSet<>();
List<String> logs = getLogsInQueue(queue.getRow());
for (String log : logs) {
sortedLogs.add(log);
}
LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver);
return new Pair<>(rowKey, sortedLogs);
} else {
// Delete orphaned queues
removeQueue(Bytes.toString(queue.getRow()));
LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " +
regionserver);
}
}
}
} catch (IOException | KeeperException e) {
String errMsg = "Failed claiming queues for regionserver=" + regionserver;
abortable.abort(errMsg, e);
}
return null;
}
@Override
public boolean isThisOurRegionServer(String regionserver) {
return this.serverName.equals(regionserver);
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
@Override
public void removePeerFromHFileRefs(String peerId) {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
@Override
public void removeHFileRefs(String peerId, List<String> files) {
// TODO
throw new NotImplementedException(HConstants.NOT_IMPLEMENTED);
}
private String buildQueueRowKey(String queueId) {
return buildQueueRowKey(serverName, queueId);
}
/**
* Convenience method that gets the row key of the queue specified by queueId
* @param queueId queueId of a queue in this server
* @return the row key of the queue in the Replication Table
*/
private byte[] queueIdToRowKey(String queueId) {
return queueIdToRowKey(serverName, queueId);
}
/**
* See safeQueueUpdate(RowMutations mutate)
*
* @param put Row mutation to perform on the queue
*/
private void safeQueueUpdate(Put put) throws ReplicationException, IOException {
RowMutations mutations = new RowMutations(put.getRow());
mutations.add(put);
safeQueueUpdate(mutations);
}
/**
* See safeQueueUpdate(RowMutations mutate)
*
* @param delete Row mutation to perform on the queue
*/
private void safeQueueUpdate(Delete delete) throws ReplicationException,
IOException{
RowMutations mutations = new RowMutations(delete.getRow());
mutations.add(delete);
safeQueueUpdate(mutations);
}
/**
* Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column
* of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost
* ownership of the column or an IO Exception has occurred during the transaction.
*
* @param mutate Mutation to perform on a given queue
*/
private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
try (Table replicationTable = getOrBlockOnReplicationTable()) {
boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE)
.qualifier(COL_QUEUE_OWNER).ifEquals(serverNameBytes).thenMutate(mutate);
if (!updateSuccess) {
throw new ReplicationException("Failed to update Replication Table because we lost queue " +
" ownership");
}
}
}
/**
* Check if the queue specified by queueId is stored in HBase
*
* @param queueId Either raw or reclaimed format of the queueId
* @return Whether the queue is stored in HBase
* @throws IOException
*/
private boolean checkQueueExists(String queueId) throws IOException {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
byte[] rowKey = queueIdToRowKey(queueId);
return replicationTable.exists(new Get(rowKey));
}
}
/**
* Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the
* recently killed server is still the OWNER before we claim it.
*
* @param queue The queue that we are trying to claim
* @param originalServer The server that originally owned the queue
* @return Whether we successfully claimed the queue
* @throws IOException
*/
private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{
Put putQueueNameAndHistory = new Put(queue.getRow());
putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName));
String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE,
COL_QUEUE_OWNER_HISTORY)), originalServer);
putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY,
Bytes.toBytes(newOwnerHistory));
RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
claimAndRenameQueue.add(putQueueNameAndHistory);
// Attempt to claim ownership for this queue by checking if the current OWNER is the original
// server. If it is not then another RS has already claimed it. If it is we set ourselves as the
// new owner and update the queue's history
try (Table replicationTable = getOrBlockOnReplicationTable()) {
boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE)
.qualifier(COL_QUEUE_OWNER).ifEquals(Bytes.toBytes(originalServer))
.thenMutate(claimAndRenameQueue);
return success;
}
}
/**
* Attempts to run a Get on some queue. Will only return a non-null result if we currently own
* the queue.
*
* @param get The Get that we want to query
* @return The result of the Get if this server is the owner of the queue. Else it returns null.
* @throws IOException
*/
private Result getResultIfOwner(Get get) throws IOException {
Scan scan = new Scan(get);
// Check if the Get currently contains all columns or only specific columns
if (scan.getFamilyMap().size() > 0) {
// Add the OWNER column if the scan is already only over specific columns
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
}
scan.setMaxResultSize(1);
SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareOperator.EQUAL, serverNameBytes);
scan.setFilter(checkOwner);
ResultScanner scanner = null;
try (Table replicationTable = getOrBlockOnReplicationTable()) {
scanner = replicationTable.getScanner(scan);
Result result = scanner.next();
return (result == null || result.isEmpty()) ? null : result;
} finally {
if (scanner != null) {
scanner.close();
}
}
}
}

View File

@ -157,10 +157,8 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.AccessDeniedException;
@ -1148,15 +1146,12 @@ public class HMaster extends HRegionServer implements MasterServices {
}
// Start replication zk node cleaner
if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) {
try {
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
} catch (Exception e) {
LOG.error("start replicationZKNodeCleanerChore failed", e);
}
try {
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
} catch (Exception e) {
LOG.error("start replicationZKNodeCleanerChore failed", e);
}
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
getChoreService().scheduleChore(replicationMetaCleaner);

View File

@ -94,8 +94,6 @@ public class TestMultiSlaveReplication {
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
conf1.setClass("hbase.region.replica.replication.replicationQueues.class",
ReplicationQueuesZKImpl.class, ReplicationQueues.class);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();

View File

@ -1,495 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
import static junit.framework.TestCase.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationStateHBaseImpl {
private static Configuration conf;
private static HBaseTestingUtility utility;
private static ZKWatcher zkw;
private static String replicationZNode;
private static ReplicationQueues rq1;
private static ReplicationQueues rq2;
private static ReplicationQueues rq3;
private static ReplicationQueuesClient rqc;
private static ReplicationPeers rp;
private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L)
.toString();
private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L)
.toString();
private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
.toString();
private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
.toString();
private static DummyServer ds0;
private static DummyServer ds1;
private static DummyServer ds2;
private static DummyServer ds3;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.replicationQueues.class",
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility.startMiniCluster();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
}
@Before
public void setUp() {
try {
ds0 = new DummyServer(server0);
rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
conf, ds0));
ds1 = new DummyServer(server1);
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq1.init(server1);
ds2 = new DummyServer(server2);
rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
rq2.init(server2);
ds3 = new DummyServer(server3);
rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
rq3.init(server3);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
rp.init();
} catch (Exception e) {
fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage());
}
}
@Test
public void checkNamingSchema() throws Exception {
assertTrue(rq1.isThisOurRegionServer(server1));
assertTrue(!rq1.isThisOurRegionServer(server1 + "a"));
assertTrue(!rq1.isThisOurRegionServer(null));
}
@Test
public void testSingleReplicationQueuesHBaseImpl() {
try {
// Test adding in WAL files
assertEquals(0, rq1.getAllQueues().size());
rq1.addLog("Queue1", "WALLogFile1.1");
assertEquals(1, rq1.getAllQueues().size());
rq1.addLog("Queue1", "WALLogFile1.2");
rq1.addLog("Queue1", "WALLogFile1.3");
rq1.addLog("Queue1", "WALLogFile1.4");
rq1.addLog("Queue2", "WALLogFile2.1");
rq1.addLog("Queue3", "WALLogFile3.1");
assertEquals(3, rq1.getAllQueues().size());
assertEquals(4, rq1.getLogsInQueue("Queue1").size());
assertEquals(1, rq1.getLogsInQueue("Queue2").size());
assertEquals(1, rq1.getLogsInQueue("Queue3").size());
// Make sure that abortCount is still 0
assertEquals(0, ds1.getAbortCount());
// Make sure that getting a log from a non-existent queue triggers an abort
assertNull(rq1.getLogsInQueue("Queue4"));
assertEquals(1, ds1.getAbortCount());
} catch (ReplicationException e) {
e.printStackTrace();
fail("testAddLog received a ReplicationException");
}
try {
// Test updating the log positions
assertEquals(0L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
rq1.setLogPosition("Queue1", "WALLogFile1.1", 123L);
assertEquals(123L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
rq1.setLogPosition("Queue1", "WALLogFile1.1", 123456789L);
assertEquals(123456789L, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
rq1.setLogPosition("Queue2", "WALLogFile2.1", 242L);
assertEquals(242L, rq1.getLogPosition("Queue2", "WALLogFile2.1"));
rq1.setLogPosition("Queue3", "WALLogFile3.1", 243L);
assertEquals(243L, rq1.getLogPosition("Queue3", "WALLogFile3.1"));
// Test that setting log positions in non-existing logs will cause an abort
assertEquals(1, ds1.getAbortCount());
rq1.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L);
assertEquals(2, ds1.getAbortCount());
rq1.setLogPosition("NotHereQueue", "NotHereFile", 243L);
assertEquals(3, ds1.getAbortCount());
rq1.setLogPosition("Queue1", "NotHereFile", 243l);
assertEquals(4, ds1.getAbortCount());
// Test reading log positions for non-existent queues and WAL's
try {
rq1.getLogPosition("Queue1", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
"non-existent WAL");
} catch (ReplicationException e) {
}
try {
rq1.getLogPosition("NotHereQueue", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
"non-existent queue");
} catch (ReplicationException e) {
}
// Test removing logs
rq1.removeLog("Queue1", "WALLogFile1.1");
assertEquals(3, rq1.getLogsInQueue("Queue1").size());
// Test removing queues
rq1.removeQueue("Queue2");
assertEquals(2, rq1.getAllQueues().size());
assertNull(rq1.getLogsInQueue("Queue2"));
// Test that getting logs from a non-existent queue aborts
assertEquals(5, ds1.getAbortCount());
// Test removing all queues for a Region Server
rq1.removeAllQueues();
assertEquals(0, rq1.getAllQueues().size());
assertNull(rq1.getLogsInQueue("Queue1"));
// Test that getting logs from a non-existent queue aborts
assertEquals(6, ds1.getAbortCount());
// Test removing a non-existent queue does not cause an abort. This is because we can
// attempt to remove a queue that has no corresponding Replication Table row (if we never
// registered a WAL for it)
rq1.removeQueue("NotHereQueue");
assertEquals(6, ds1.getAbortCount());
} catch (ReplicationException e) {
e.printStackTrace();
fail("testAddLog received a ReplicationException");
}
}
@Test
public void TestMultipleReplicationQueuesHBaseImpl () {
try {
rp.registerPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
rp.registerPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
rp.registerPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
} catch (ReplicationException e) {
fail("Failed to add peers to ReplicationPeers");
}
try {
// Test adding in WAL files
rq1.addLog("Queue1", "WALLogFile1.1");
rq1.addLog("Queue1", "WALLogFile1.2");
rq1.addLog("Queue1", "WALLogFile1.3");
rq1.addLog("Queue1", "WALLogFile1.4");
rq1.addLog("Queue2", "WALLogFile2.1");
rq1.addLog("Queue3", "WALLogFile3.1");
rq2.addLog("Queue1", "WALLogFile1.1");
rq2.addLog("Queue1", "WALLogFile1.2");
rq2.addLog("Queue2", "WALLogFile2.1");
rq3.addLog("Queue1", "WALLogFile1.1");
// Test adding logs to replication queues
assertEquals(3, rq1.getAllQueues().size());
assertEquals(2, rq2.getAllQueues().size());
assertEquals(1, rq3.getAllQueues().size());
assertEquals(4, rq1.getLogsInQueue("Queue1").size());
assertEquals(1, rq1.getLogsInQueue("Queue2").size());
assertEquals(1, rq1.getLogsInQueue("Queue3").size());
assertEquals(2, rq2.getLogsInQueue("Queue1").size());
assertEquals(1, rq2.getLogsInQueue("Queue2").size());
assertEquals(1, rq3.getLogsInQueue("Queue1").size());
} catch (ReplicationException e) {
e.printStackTrace();
fail("testAddLogs received a ReplicationException");
}
try {
// Test setting and reading offset in queues
rq1.setLogPosition("Queue1", "WALLogFile1.1", 1l);
rq1.setLogPosition("Queue1", "WALLogFile1.2", 2l);
rq1.setLogPosition("Queue1", "WALLogFile1.3", 3l);
rq1.setLogPosition("Queue2", "WALLogFile2.1", 4l);
rq1.setLogPosition("Queue2", "WALLogFile2.2", 5l);
rq1.setLogPosition("Queue3", "WALLogFile3.1", 6l);
rq2.setLogPosition("Queue1", "WALLogFile1.1", 7l);
rq2.setLogPosition("Queue2", "WALLogFile2.1", 8l);
rq3.setLogPosition("Queue1", "WALLogFile1.1", 9l);
assertEquals(1l, rq1.getLogPosition("Queue1", "WALLogFile1.1"));
assertEquals(2l, rq1.getLogPosition("Queue1", "WALLogFile1.2"));
assertEquals(4l, rq1.getLogPosition("Queue2", "WALLogFile2.1"));
assertEquals(6l, rq1.getLogPosition("Queue3", "WALLogFile3.1"));
assertEquals(7l, rq2.getLogPosition("Queue1", "WALLogFile1.1"));
assertEquals(8l, rq2.getLogPosition("Queue2", "WALLogFile2.1"));
assertEquals(9l, rq3.getLogPosition("Queue1", "WALLogFile1.1"));
assertEquals(rq1.getListOfReplicators().size(), 3);
assertEquals(rq2.getListOfReplicators().size(), 3);
assertEquals(rq3.getListOfReplicators().size(), 3);
} catch (ReplicationException e) {
fail("testAddLogs threw a ReplicationException");
}
try {
// Test claiming queues
List<String> claimedQueuesFromRq2 = rq1.getUnClaimedQueueIds(server2);
// Check to make sure that list of peers with outstanding queues is decremented by one
// after claimQueues
// Check to make sure that we claimed the proper number of queues
assertEquals(2, claimedQueuesFromRq2.size());
assertTrue(claimedQueuesFromRq2.contains("Queue1-" + server2));
assertTrue(claimedQueuesFromRq2.contains("Queue2-" + server2));
assertEquals(2, rq1.claimQueue(server2, "Queue1-" + server2).getSecond().size());
assertEquals(1, rq1.claimQueue(server2, "Queue2-" + server2).getSecond().size());
rq1.removeReplicatorIfQueueIsEmpty(server2);
assertEquals(rq1.getListOfReplicators().size(), 2);
assertEquals(rq2.getListOfReplicators().size(), 2);
assertEquals(rq3.getListOfReplicators().size(), 2);
assertEquals(5, rq1.getAllQueues().size());
// Check that all the logs in the other queue were claimed
assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size());
assertEquals(1, rq1.getLogsInQueue("Queue2-" + server2).size());
// Check that the offsets of the claimed queues are the same
assertEquals(7l, rq1.getLogPosition("Queue1-" + server2, "WALLogFile1.1"));
assertEquals(8l, rq1.getLogPosition("Queue2-" + server2, "WALLogFile2.1"));
// Check that the queues were properly removed from rq2
assertEquals(0, rq2.getAllQueues().size());
assertNull(rq2.getLogsInQueue("Queue1"));
assertNull(rq2.getLogsInQueue("Queue2"));
// Check that non-existent peer queues are not claimed
rq1.addLog("UnclaimableQueue", "WALLogFile1.1");
rq1.addLog("UnclaimableQueue", "WALLogFile1.2");
assertEquals(6, rq1.getAllQueues().size());
List<String> claimedQueuesFromRq1 = rq3.getUnClaimedQueueIds(server1);
for(String queue : claimedQueuesFromRq1) {
rq3.claimQueue(server1, queue);
}
rq3.removeReplicatorIfQueueIsEmpty(server1);
assertEquals(rq1.getListOfReplicators().size(), 1);
assertEquals(rq2.getListOfReplicators().size(), 1);
assertEquals(rq3.getListOfReplicators().size(), 1);
// Note that we do not pick up the queue: UnclaimableQueue which was not registered in
// Replication Peers
assertEquals(6, rq3.getAllQueues().size());
// Test claiming non-existing queues
List<String> noQueues = rq3.getUnClaimedQueueIds("NotARealServer");
assertNull(noQueues);
assertEquals(6, rq3.getAllQueues().size());
// Test claiming own queues
noQueues = rq3.getUnClaimedQueueIds(server3);
Assert.assertNull(noQueues);
assertEquals(6, rq3.getAllQueues().size());
// Check that rq3 still remain on list of replicators
assertEquals(1, rq3.getListOfReplicators().size());
} catch (ReplicationException e) {
fail("testClaimQueue threw a ReplicationException");
}
}
@Test
public void TestReplicationQueuesClient() throws Exception{
// Test ReplicationQueuesClient log tracking
rq1.addLog("Queue1", "WALLogFile1.1");
assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size());
rq1.removeLog("Queue1", "WALLogFile1.1");
assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size());
rq2.addLog("Queue2", "WALLogFile2.1");
rq2.addLog("Queue2", "WALLogFile2.2");
assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size());
rq3.addLog("Queue1", "WALLogFile1.1");
rq3.addLog("Queue3", "WALLogFile3.1");
rq3.addLog("Queue3", "WALLogFile3.2");
// Test ReplicationQueueClient log tracking for faulty cases
assertEquals(0, ds0.getAbortCount());
assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue"));
assertNull(rqc.getLogsInQueue(server1, "NotHereQueue"));
assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1"));
assertEquals(3, ds0.getAbortCount());
// Test ReplicationQueueClient replicators
List<String> replicators = rqc.getListOfReplicators();
assertEquals(3, replicators.size());
assertTrue(replicators.contains(server1));
assertTrue(replicators.contains(server2));
rq1.removeQueue("Queue1");
assertEquals(2, rqc.getListOfReplicators().size());
// Test ReplicationQueuesClient queue tracking
assertEquals(0, rqc.getAllQueues(server1).size());
rq1.addLog("Queue2", "WALLogFile2.1");
rq1.addLog("Queue3", "WALLogFile3.1");
assertEquals(2, rqc.getAllQueues(server1).size());
rq1.removeAllQueues();
assertEquals(0, rqc.getAllQueues(server1).size());
// Test ReplicationQueuesClient queue tracking for faulty cases
assertEquals(0, rqc.getAllQueues("NotHereServer").size());
// Test ReplicationQueuesClient get all WAL's
assertEquals(5 , rqc.getAllWALs().size());
rq3.removeLog("Queue1", "WALLogFile1.1");
assertEquals(4, rqc.getAllWALs().size());
rq3.removeAllQueues();
assertEquals(2, rqc.getAllWALs().size());
rq2.removeAllQueues();
assertEquals(0, rqc.getAllWALs().size());
}
@After
public void clearQueues() throws Exception{
rq1.removeAllQueues();
rq2.removeAllQueues();
rq3.removeAllQueues();
assertEquals(0, rq1.getAllQueues().size());
assertEquals(0, rq2.getAllQueues().size());
assertEquals(0, rq3.getAllQueues().size());
ds0.resetAbortCount();
ds1.resetAbortCount();
ds2.resetAbortCount();
ds3.resetAbortCount();
}
@After
public void tearDown() throws KeeperException, IOException {
ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility.shutdownMiniCluster();
utility.shutdownMiniZKCluster();
}
static class DummyServer implements Server {
private String serverName;
private boolean isAborted = false;
private boolean isStopped = false;
private int abortCount = 0;
public DummyServer(String serverName) {
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZKWatcher getZooKeeper() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(this.serverName);
}
@Override
public void abort(String why, Throwable e) {
abortCount++;
this.isAborted = true;
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isStopped = true;
}
@Override
public boolean isStopped() {
return this.isStopped;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
public int getAbortCount() {
return abortCount;
}
public void resetAbortCount() {
abortCount = 0;
}
@Override
public FileSystem getFileSystem() {
return null;
}
@Override
public boolean isStopping() {
return false;
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
}
}

View File

@ -1,109 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization
* should be non-blocking, but any method calls that access the table should be blocking.
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationTableBase {
private static long SLEEP_MILLIS = 5000;
private static long TIME_OUT_MILLIS = 3000;
private static Configuration conf;
private static HBaseTestingUtility utility;
private static ZKWatcher zkw;
private static ReplicationTableBase rb;
private static ReplicationQueues rq;
private static ReplicationQueuesClient rqc;
private volatile boolean asyncRequestSuccess = false;
@Test
public void testSlowStartup() throws Exception{
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.replicationQueues.class",
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
rb = new ReplicationTableBase(conf, zkw) {};
rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
conf, zkw, zkw));
rqc = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, zkw, zkw));
return true;
}
@Override
public String explainFailure() throws Exception {
return "Failed to initialize ReplicationTableBase, TableBasedReplicationQueuesClient and " +
"TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS +
" ms. Their initialization " + "should be non-blocking";
}
});
final RequestReplicationQueueData async = new RequestReplicationQueueData();
async.start();
Thread.sleep(SLEEP_MILLIS);
// Test that the Replication Table has not been assigned and the methods are blocking
assertFalse(rb.getInitializationStatus());
assertFalse(asyncRequestSuccess);
utility.startMiniCluster();
// Test that the methods do return the correct results after getting the table
utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
async.join();
return true;
}
@Override
public String explainFailure() throws Exception {
return "ReplicationQueue failed to return list of replicators even after Replication Table "
+ "was initialized timeout=" + TIME_OUT_MILLIS + " ms";
}
});
assertTrue(asyncRequestSuccess);
}
public class RequestReplicationQueueData extends Thread {
@Override
public void run() {
assertEquals(0, rq.getListOfReplicators().size());
asyncRequestSuccess = true;
}
}
}

View File

@ -1,63 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl;
import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* Tests the ReplicationSourceManager with TableBasedReplicationQueue's and
* TableBasedReplicationQueuesClient
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationSourceManager {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
conf.setClass("hbase.region.replica.replication.replicationQueues.class",
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility = new HBaseTestingUtility(conf);
utility.startMiniCluster();
Waiter.waitFor(conf, 3 * 1000,
() -> utility.getMiniHBaseCluster().getMaster().isInitialized());
utility.waitUntilAllRegionsAssigned(TableName.valueOf(
NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"));
setupZkAndReplication();
}
}