diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 3ff6914f825..9f4ad18ae0c 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -31,21 +31,16 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @InterfaceAudience.Private public class ReplicationFactory { - public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class; - public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) throws Exception { - Class classToBuild = args.getConf().getClass("hbase.region.replica." + - "replication.replicationQueues.class", defaultReplicationQueueClass); - return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); + return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class, + args); } - public static ReplicationQueuesClient getReplicationQueuesClient( - ReplicationQueuesClientArguments args) throws Exception { - Class classToBuild = args.getConf().getClass( - "hbase.region.replica.replication.replicationQueuesClient.class", - ReplicationQueuesClientZKImpl.class); - return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args); + public static ReplicationQueuesClient + getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception { + return (ReplicationQueuesClient) ConstructorUtils + .invokeConstructor(ReplicationQueuesClientZKImpl.class, args); } public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf, diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java deleted file mode 100644 index 0a8ed312bb6..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes - * the ReplicationTableBase to access the Replication Table. - */ -@InterfaceAudience.Private -public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase - implements ReplicationQueuesClient { - - public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args) - throws IOException { - super(args.getConf(), args.getAbortable()); - } - public TableBasedReplicationQueuesClientImpl(Configuration conf, - Abortable abortable) throws IOException { - super(conf, abortable); - } - - @Override - public void init() throws ReplicationException{ - // no-op - } - - @Override - public List getListOfReplicators() { - return super.getListOfReplicators(); - } - - @Override - public List getLogsInQueue(String serverName, String queueId) { - return super.getLogsInQueue(serverName, queueId); - } - - @Override - public List getAllQueues(String serverName) { - return super.getAllQueues(serverName); - } - - @Override - public Set getAllWALs() { - Set allWals = new HashSet<>(); - ResultScanner allQueues = null; - try (Table replicationTable = getOrBlockOnReplicationTable()) { - allQueues = replicationTable.getScanner(new Scan()); - for (Result queue : allQueues) { - for (String wal : readWALsFromResult(queue)) { - allWals.add(wal); - } - } - } catch (IOException e) { - String errMsg = "Failed getting all WAL's in Replication Table"; - abortable.abort(errMsg, e); - } finally { - if (allQueues != null) { - allQueues.close(); - } - } - return allWals; - } - - @Override - public int getHFileRefsNodeChangeVersion() throws KeeperException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public List getAllPeersFromHFileRefsQueue() throws KeeperException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public List getReplicableHFiles(String peerId) throws KeeperException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java deleted file mode 100644 index b6c849c7351..00000000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ /dev/null @@ -1,448 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.commons.lang3.NotImplementedException; -import org.apache.hadoop.conf.Configuration; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; - -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * This class provides an implementation of the ReplicationQueues interface using an HBase table - * "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table. - */ -@InterfaceAudience.Private -public class TableBasedReplicationQueuesImpl extends ReplicationTableBase - implements ReplicationQueues { - - private static final Logger LOG = LoggerFactory.getLogger(TableBasedReplicationQueuesImpl.class); - - // Common byte values used in replication offset tracking - private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L); - private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes(""); - - private String serverName = null; - private byte[] serverNameBytes = null; - - // TODO: Only use this variable temporarily. Eventually we want to use HBase to store all - // TODO: replication information - private ReplicationStateZKBase replicationState; - - public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException { - this(args.getConf(), args.getAbortable(), args.getZk()); - } - - public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZKWatcher zkw) - throws IOException { - super(conf, abort); - replicationState = new ReplicationStateZKBase(zkw, conf, abort) {}; - } - - @Override - public void init(String serverName) throws ReplicationException { - this.serverName = serverName; - this.serverNameBytes = Bytes.toBytes(serverName); - } - - @Override - public List getListOfReplicators() { - return super.getListOfReplicators(); - } - - @Override - public void removeQueue(String queueId) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - if (checkQueueExists(queueId)) { - Delete deleteQueue = new Delete(rowKey); - safeQueueUpdate(deleteQueue); - } else { - LOG.info("No logs were registered for queue id=" + queueId + " so no rows were removed " + - "from the replication table while removing the queue"); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing queue queueId=" + queueId; - abortable.abort(errMsg, e); - } - } - - @Override - public void addLog(String queueId, String filename) throws ReplicationException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - if (!checkQueueExists(queueId)) { - // Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values - Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId))); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes); - putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES); - putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - replicationTable.put(putNewQueue); - } else { - // Otherwise simply add the new log and offset as a new column - Put putNewLog = new Put(queueIdToRowKey(queueId)); - putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES); - safeQueueUpdate(putNewLog); - } - } catch (IOException | ReplicationException e) { - String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void removeLog(String queueId, String filename) { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Delete delete = new Delete(rowKey); - delete.addColumns(CF_QUEUE, Bytes.toBytes(filename)); - safeQueueUpdate(delete); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename; - abortable.abort(errMsg, e); - } - } - - @Override - public void setLogPosition(String queueId, String filename, long position) { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - byte[] rowKey = queueIdToRowKey(queueId); - // Check that the log exists. addLog() must have been called before setLogPosition(). - Get checkLogExists = new Get(rowKey); - checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename)); - if (!replicationTable.exists(checkLogExists)) { - String errMsg = "Could not set position of non-existent log from queueId=" + queueId + - ", filename=" + filename; - abortable.abort(errMsg, new ReplicationException(errMsg)); - return; - } - // Update the log offset if it exists - Put walAndOffset = new Put(rowKey); - walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position)); - safeQueueUpdate(walAndOffset); - } catch (IOException | ReplicationException e) { - String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + - filename + " position=" + position; - abortable.abort(errMsg, e); - } - } - - @Override - public long getLogPosition(String queueId, String filename) throws ReplicationException { - try { - byte[] rowKey = queueIdToRowKey(queueId); - Get getOffset = new Get(rowKey); - getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename)); - Result result = getResultIfOwner(getOffset); - if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) { - throw new ReplicationException("Could not read empty result while getting log position " + - "queueId=" + queueId + ", filename=" + filename); - } - return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename))); - } catch (IOException e) { - throw new ReplicationException("Could not get position in log for queueId=" + queueId + - ", filename=" + filename); - } - } - - @Override - public void removeAllQueues() { - List myQueueIds = getAllQueues(); - for (String queueId : myQueueIds) { - removeQueue(queueId); - } - } - - @Override - public List getLogsInQueue(String queueId) { - String errMsg = "Failed getting logs in queue queueId=" + queueId; - byte[] rowKey = queueIdToRowKey(queueId); - List logs = new ArrayList<>(); - try { - Get getQueue = new Get(rowKey); - Result queue = getResultIfOwner(getQueue); - if (queue == null || queue.isEmpty()) { - String errMsgLostOwnership = "Failed getting logs for queue queueId=" + - Bytes.toString(rowKey) + " because the queue was missing or we lost ownership"; - abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership)); - return null; - } - Map familyMap = queue.getFamilyMap(CF_QUEUE); - for(byte[] cQualifier : familyMap.keySet()) { - if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, - COL_QUEUE_OWNER_HISTORY)) { - continue; - } - logs.add(Bytes.toString(cQualifier)); - } - } catch (IOException e) { - abortable.abort(errMsg, e); - return null; - } - return logs; - } - - @Override - public List getAllQueues() { - return getAllQueues(serverName); - } - - @Override public List getUnClaimedQueueIds(String regionserver) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)) { - List res = new ArrayList<>(); - for (Result queue : queuesToClaim) { - String rowKey = Bytes.toString(queue.getRow()); - res.add(rowKey); - } - return res.isEmpty() ? null : res; - } catch (IOException e) { - String errMsg = "Failed getUnClaimedQueueIds"; - abortable.abort(errMsg, e); - } - return null; - } - - @Override public void removeReplicatorIfQueueIsEmpty(String regionserver) { - // Do nothing here - } - - @Override - public Pair> claimQueue(String regionserver, String queueId) { - if (isThisOurRegionServer(regionserver)) { - return null; - } - - try (ResultScanner queuesToClaim = getQueuesBelongingToServer(regionserver)){ - for (Result queue : queuesToClaim) { - String rowKey = Bytes.toString(queue.getRow()); - if (!rowKey.equals(queueId)){ - continue; - } - if (attemptToClaimQueue(queue, regionserver)) { - ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey); - if (replicationState.peerExists(replicationQueueInfo.getPeerId())) { - SortedSet sortedLogs = new TreeSet<>(); - List logs = getLogsInQueue(queue.getRow()); - for (String log : logs) { - sortedLogs.add(log); - } - LOG.info(serverName + " has claimed queue " + rowKey + " from " + regionserver); - return new Pair<>(rowKey, sortedLogs); - } else { - // Delete orphaned queues - removeQueue(Bytes.toString(queue.getRow())); - LOG.info(serverName + " has deleted abandoned queue " + queueId + " from " + - regionserver); - } - } - } - } catch (IOException | KeeperException e) { - String errMsg = "Failed claiming queues for regionserver=" + regionserver; - abortable.abort(errMsg, e); - } - return null; - } - - @Override - public boolean isThisOurRegionServer(String regionserver) { - return this.serverName.equals(regionserver); - } - - @Override - public void addPeerToHFileRefs(String peerId) throws ReplicationException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public void removePeerFromHFileRefs(String peerId) { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public void addHFileRefs(String peerId, List> pairs) - throws ReplicationException { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - @Override - public void removeHFileRefs(String peerId, List files) { - // TODO - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - - private String buildQueueRowKey(String queueId) { - return buildQueueRowKey(serverName, queueId); - } - - /** - * Convenience method that gets the row key of the queue specified by queueId - * @param queueId queueId of a queue in this server - * @return the row key of the queue in the Replication Table - */ - private byte[] queueIdToRowKey(String queueId) { - return queueIdToRowKey(serverName, queueId); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param put Row mutation to perform on the queue - */ - private void safeQueueUpdate(Put put) throws ReplicationException, IOException { - RowMutations mutations = new RowMutations(put.getRow()); - mutations.add(put); - safeQueueUpdate(mutations); - } - - /** - * See safeQueueUpdate(RowMutations mutate) - * - * @param delete Row mutation to perform on the queue - */ - private void safeQueueUpdate(Delete delete) throws ReplicationException, - IOException{ - RowMutations mutations = new RowMutations(delete.getRow()); - mutations.add(delete); - safeQueueUpdate(mutations); - } - - /** - * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column - * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost - * ownership of the column or an IO Exception has occurred during the transaction. - * - * @param mutate Mutation to perform on a given queue - */ - private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{ - try (Table replicationTable = getOrBlockOnReplicationTable()) { - boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE) - .qualifier(COL_QUEUE_OWNER).ifEquals(serverNameBytes).thenMutate(mutate); - if (!updateSuccess) { - throw new ReplicationException("Failed to update Replication Table because we lost queue " + - " ownership"); - } - } - } - - /** - * Check if the queue specified by queueId is stored in HBase - * - * @param queueId Either raw or reclaimed format of the queueId - * @return Whether the queue is stored in HBase - * @throws IOException - */ - private boolean checkQueueExists(String queueId) throws IOException { - try (Table replicationTable = getOrBlockOnReplicationTable()) { - byte[] rowKey = queueIdToRowKey(queueId); - return replicationTable.exists(new Get(rowKey)); - } - } - - /** - * Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the - * recently killed server is still the OWNER before we claim it. - * - * @param queue The queue that we are trying to claim - * @param originalServer The server that originally owned the queue - * @return Whether we successfully claimed the queue - * @throws IOException - */ - private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{ - Put putQueueNameAndHistory = new Put(queue.getRow()); - putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName)); - String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, - COL_QUEUE_OWNER_HISTORY)), originalServer); - putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, - Bytes.toBytes(newOwnerHistory)); - RowMutations claimAndRenameQueue = new RowMutations(queue.getRow()); - claimAndRenameQueue.add(putQueueNameAndHistory); - // Attempt to claim ownership for this queue by checking if the current OWNER is the original - // server. If it is not then another RS has already claimed it. If it is we set ourselves as the - // new owner and update the queue's history - try (Table replicationTable = getOrBlockOnReplicationTable()) { - boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE) - .qualifier(COL_QUEUE_OWNER).ifEquals(Bytes.toBytes(originalServer)) - .thenMutate(claimAndRenameQueue); - return success; - } - } - - /** - * Attempts to run a Get on some queue. Will only return a non-null result if we currently own - * the queue. - * - * @param get The Get that we want to query - * @return The result of the Get if this server is the owner of the queue. Else it returns null. - * @throws IOException - */ - private Result getResultIfOwner(Get get) throws IOException { - Scan scan = new Scan(get); - // Check if the Get currently contains all columns or only specific columns - if (scan.getFamilyMap().size() > 0) { - // Add the OWNER column if the scan is already only over specific columns - scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER); - } - scan.setMaxResultSize(1); - SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, - CompareOperator.EQUAL, serverNameBytes); - scan.setFilter(checkOwner); - ResultScanner scanner = null; - try (Table replicationTable = getOrBlockOnReplicationTable()) { - scanner = replicationTable.getScanner(scan); - Result result = scanner.next(); - return (result == null || result.isEmpty()) ? null : result; - } finally { - if (scanner != null) { - scanner.close(); - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cf950301072..efa17a42933 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -157,10 +157,8 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.AccessDeniedException; @@ -1148,15 +1146,12 @@ public class HMaster extends HRegionServer implements MasterServices { } // Start replication zk node cleaner - if (conf.getClass("hbase.region.replica.replication.replicationQueues.class", - ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) { - try { - replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, - new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); - getChoreService().scheduleChore(replicationZKNodeCleanerChore); - } catch (Exception e) { - LOG.error("start replicationZKNodeCleanerChore failed", e); - } + try { + replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, + new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); + getChoreService().scheduleChore(replicationZKNodeCleanerChore); + } catch (Exception e) { + LOG.error("start replicationZKNodeCleanerChore failed", e); } replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); getChoreService().scheduleChore(replicationMetaCleaner); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 9da0745f839..c57d9bb2b70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -94,8 +94,6 @@ public class TestMultiSlaveReplication { conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); conf1.setInt("hbase.master.cleaner.interval", 5 * 1000); - conf1.setClass("hbase.region.replica.replication.replicationQueues.class", - ReplicationQueuesZKImpl.class, ReplicationQueues.class); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java deleted file mode 100644 index 1ef525fb1df..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java +++ /dev/null @@ -1,495 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.CoordinatedStateManager; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.List; - -import static junit.framework.TestCase.assertNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@Category({ReplicationTests.class, MediumTests.class}) -public class TestReplicationStateHBaseImpl { - - private static Configuration conf; - private static HBaseTestingUtility utility; - private static ZKWatcher zkw; - private static String replicationZNode; - - private static ReplicationQueues rq1; - private static ReplicationQueues rq2; - private static ReplicationQueues rq3; - private static ReplicationQueuesClient rqc; - private static ReplicationPeers rp; - - - private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L) - .toString(); - private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L) - .toString(); - private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L) - .toString(); - private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L) - .toString(); - - private static DummyServer ds0; - private static DummyServer ds1; - private static DummyServer ds2; - private static DummyServer ds3; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - utility = new HBaseTestingUtility(); - conf = utility.getConfiguration(); - conf.setClass("hbase.region.replica.replication.replicationQueues.class", - TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); - conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", - TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); - utility.startMiniCluster(); - zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); - } - - @Before - public void setUp() { - try { - ds0 = new DummyServer(server0); - rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( - conf, ds0)); - ds1 = new DummyServer(server1); - rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); - rq1.init(server1); - ds2 = new DummyServer(server2); - rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); - rq2.init(server2); - ds3 = new DummyServer(server3); - rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); - rq3.init(server3); - rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); - rp.init(); - } catch (Exception e) { - fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage()); - } - } - - @Test - public void checkNamingSchema() throws Exception { - assertTrue(rq1.isThisOurRegionServer(server1)); - assertTrue(!rq1.isThisOurRegionServer(server1 + "a")); - assertTrue(!rq1.isThisOurRegionServer(null)); - } - - @Test - public void testSingleReplicationQueuesHBaseImpl() { - try { - // Test adding in WAL files - assertEquals(0, rq1.getAllQueues().size()); - rq1.addLog("Queue1", "WALLogFile1.1"); - assertEquals(1, rq1.getAllQueues().size()); - rq1.addLog("Queue1", "WALLogFile1.2"); - rq1.addLog("Queue1", "WALLogFile1.3"); - rq1.addLog("Queue1", "WALLogFile1.4"); - rq1.addLog("Queue2", "WALLogFile2.1"); - rq1.addLog("Queue3", "WALLogFile3.1"); - assertEquals(3, rq1.getAllQueues().size()); - assertEquals(4, rq1.getLogsInQueue("Queue1").size()); - assertEquals(1, rq1.getLogsInQueue("Queue2").size()); - assertEquals(1, rq1.getLogsInQueue("Queue3").size()); - // Make sure that abortCount is still 0 - assertEquals(0, ds1.getAbortCount()); - // Make sure that getting a log from a non-existent queue triggers an abort - assertNull(rq1.getLogsInQueue("Queue4")); - assertEquals(1, ds1.getAbortCount()); - } catch (ReplicationException e) { - e.printStackTrace(); - fail("testAddLog received a ReplicationException"); - } - try { - - // Test updating the log positions - assertEquals(0L, rq1.getLogPosition("Queue1", "WALLogFile1.1")); - rq1.setLogPosition("Queue1", "WALLogFile1.1", 123L); - assertEquals(123L, rq1.getLogPosition("Queue1", "WALLogFile1.1")); - rq1.setLogPosition("Queue1", "WALLogFile1.1", 123456789L); - assertEquals(123456789L, rq1.getLogPosition("Queue1", "WALLogFile1.1")); - rq1.setLogPosition("Queue2", "WALLogFile2.1", 242L); - assertEquals(242L, rq1.getLogPosition("Queue2", "WALLogFile2.1")); - rq1.setLogPosition("Queue3", "WALLogFile3.1", 243L); - assertEquals(243L, rq1.getLogPosition("Queue3", "WALLogFile3.1")); - - // Test that setting log positions in non-existing logs will cause an abort - assertEquals(1, ds1.getAbortCount()); - rq1.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L); - assertEquals(2, ds1.getAbortCount()); - rq1.setLogPosition("NotHereQueue", "NotHereFile", 243L); - assertEquals(3, ds1.getAbortCount()); - rq1.setLogPosition("Queue1", "NotHereFile", 243l); - assertEquals(4, ds1.getAbortCount()); - - // Test reading log positions for non-existent queues and WAL's - try { - rq1.getLogPosition("Queue1", "NotHereWAL"); - fail("Replication queue should have thrown a ReplicationException for reading from a " + - "non-existent WAL"); - } catch (ReplicationException e) { - } - try { - rq1.getLogPosition("NotHereQueue", "NotHereWAL"); - fail("Replication queue should have thrown a ReplicationException for reading from a " + - "non-existent queue"); - } catch (ReplicationException e) { - } - // Test removing logs - rq1.removeLog("Queue1", "WALLogFile1.1"); - assertEquals(3, rq1.getLogsInQueue("Queue1").size()); - // Test removing queues - rq1.removeQueue("Queue2"); - assertEquals(2, rq1.getAllQueues().size()); - assertNull(rq1.getLogsInQueue("Queue2")); - // Test that getting logs from a non-existent queue aborts - assertEquals(5, ds1.getAbortCount()); - // Test removing all queues for a Region Server - rq1.removeAllQueues(); - assertEquals(0, rq1.getAllQueues().size()); - assertNull(rq1.getLogsInQueue("Queue1")); - // Test that getting logs from a non-existent queue aborts - assertEquals(6, ds1.getAbortCount()); - // Test removing a non-existent queue does not cause an abort. This is because we can - // attempt to remove a queue that has no corresponding Replication Table row (if we never - // registered a WAL for it) - rq1.removeQueue("NotHereQueue"); - assertEquals(6, ds1.getAbortCount()); - } catch (ReplicationException e) { - e.printStackTrace(); - fail("testAddLog received a ReplicationException"); - } - } - - @Test - public void TestMultipleReplicationQueuesHBaseImpl () { - try { - rp.registerPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1")); - rp.registerPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2")); - rp.registerPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3")); - } catch (ReplicationException e) { - fail("Failed to add peers to ReplicationPeers"); - } - try { - // Test adding in WAL files - rq1.addLog("Queue1", "WALLogFile1.1"); - rq1.addLog("Queue1", "WALLogFile1.2"); - rq1.addLog("Queue1", "WALLogFile1.3"); - rq1.addLog("Queue1", "WALLogFile1.4"); - rq1.addLog("Queue2", "WALLogFile2.1"); - rq1.addLog("Queue3", "WALLogFile3.1"); - rq2.addLog("Queue1", "WALLogFile1.1"); - rq2.addLog("Queue1", "WALLogFile1.2"); - rq2.addLog("Queue2", "WALLogFile2.1"); - rq3.addLog("Queue1", "WALLogFile1.1"); - // Test adding logs to replication queues - assertEquals(3, rq1.getAllQueues().size()); - assertEquals(2, rq2.getAllQueues().size()); - assertEquals(1, rq3.getAllQueues().size()); - assertEquals(4, rq1.getLogsInQueue("Queue1").size()); - assertEquals(1, rq1.getLogsInQueue("Queue2").size()); - assertEquals(1, rq1.getLogsInQueue("Queue3").size()); - assertEquals(2, rq2.getLogsInQueue("Queue1").size()); - assertEquals(1, rq2.getLogsInQueue("Queue2").size()); - assertEquals(1, rq3.getLogsInQueue("Queue1").size()); - } catch (ReplicationException e) { - e.printStackTrace(); - fail("testAddLogs received a ReplicationException"); - } - try { - // Test setting and reading offset in queues - rq1.setLogPosition("Queue1", "WALLogFile1.1", 1l); - rq1.setLogPosition("Queue1", "WALLogFile1.2", 2l); - rq1.setLogPosition("Queue1", "WALLogFile1.3", 3l); - rq1.setLogPosition("Queue2", "WALLogFile2.1", 4l); - rq1.setLogPosition("Queue2", "WALLogFile2.2", 5l); - rq1.setLogPosition("Queue3", "WALLogFile3.1", 6l); - rq2.setLogPosition("Queue1", "WALLogFile1.1", 7l); - rq2.setLogPosition("Queue2", "WALLogFile2.1", 8l); - rq3.setLogPosition("Queue1", "WALLogFile1.1", 9l); - assertEquals(1l, rq1.getLogPosition("Queue1", "WALLogFile1.1")); - assertEquals(2l, rq1.getLogPosition("Queue1", "WALLogFile1.2")); - assertEquals(4l, rq1.getLogPosition("Queue2", "WALLogFile2.1")); - assertEquals(6l, rq1.getLogPosition("Queue3", "WALLogFile3.1")); - assertEquals(7l, rq2.getLogPosition("Queue1", "WALLogFile1.1")); - assertEquals(8l, rq2.getLogPosition("Queue2", "WALLogFile2.1")); - assertEquals(9l, rq3.getLogPosition("Queue1", "WALLogFile1.1")); - assertEquals(rq1.getListOfReplicators().size(), 3); - assertEquals(rq2.getListOfReplicators().size(), 3); - assertEquals(rq3.getListOfReplicators().size(), 3); - } catch (ReplicationException e) { - fail("testAddLogs threw a ReplicationException"); - } - try { - // Test claiming queues - List claimedQueuesFromRq2 = rq1.getUnClaimedQueueIds(server2); - // Check to make sure that list of peers with outstanding queues is decremented by one - // after claimQueues - // Check to make sure that we claimed the proper number of queues - assertEquals(2, claimedQueuesFromRq2.size()); - assertTrue(claimedQueuesFromRq2.contains("Queue1-" + server2)); - assertTrue(claimedQueuesFromRq2.contains("Queue2-" + server2)); - assertEquals(2, rq1.claimQueue(server2, "Queue1-" + server2).getSecond().size()); - assertEquals(1, rq1.claimQueue(server2, "Queue2-" + server2).getSecond().size()); - rq1.removeReplicatorIfQueueIsEmpty(server2); - assertEquals(rq1.getListOfReplicators().size(), 2); - assertEquals(rq2.getListOfReplicators().size(), 2); - assertEquals(rq3.getListOfReplicators().size(), 2); - assertEquals(5, rq1.getAllQueues().size()); - // Check that all the logs in the other queue were claimed - assertEquals(2, rq1.getLogsInQueue("Queue1-" + server2).size()); - assertEquals(1, rq1.getLogsInQueue("Queue2-" + server2).size()); - // Check that the offsets of the claimed queues are the same - assertEquals(7l, rq1.getLogPosition("Queue1-" + server2, "WALLogFile1.1")); - assertEquals(8l, rq1.getLogPosition("Queue2-" + server2, "WALLogFile2.1")); - // Check that the queues were properly removed from rq2 - assertEquals(0, rq2.getAllQueues().size()); - assertNull(rq2.getLogsInQueue("Queue1")); - assertNull(rq2.getLogsInQueue("Queue2")); - // Check that non-existent peer queues are not claimed - rq1.addLog("UnclaimableQueue", "WALLogFile1.1"); - rq1.addLog("UnclaimableQueue", "WALLogFile1.2"); - assertEquals(6, rq1.getAllQueues().size()); - List claimedQueuesFromRq1 = rq3.getUnClaimedQueueIds(server1); - for(String queue : claimedQueuesFromRq1) { - rq3.claimQueue(server1, queue); - } - rq3.removeReplicatorIfQueueIsEmpty(server1); - assertEquals(rq1.getListOfReplicators().size(), 1); - assertEquals(rq2.getListOfReplicators().size(), 1); - assertEquals(rq3.getListOfReplicators().size(), 1); - // Note that we do not pick up the queue: UnclaimableQueue which was not registered in - // Replication Peers - assertEquals(6, rq3.getAllQueues().size()); - // Test claiming non-existing queues - List noQueues = rq3.getUnClaimedQueueIds("NotARealServer"); - assertNull(noQueues); - assertEquals(6, rq3.getAllQueues().size()); - // Test claiming own queues - noQueues = rq3.getUnClaimedQueueIds(server3); - Assert.assertNull(noQueues); - assertEquals(6, rq3.getAllQueues().size()); - // Check that rq3 still remain on list of replicators - assertEquals(1, rq3.getListOfReplicators().size()); - } catch (ReplicationException e) { - fail("testClaimQueue threw a ReplicationException"); - } - } - - @Test - public void TestReplicationQueuesClient() throws Exception{ - - // Test ReplicationQueuesClient log tracking - rq1.addLog("Queue1", "WALLogFile1.1"); - assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size()); - rq1.removeLog("Queue1", "WALLogFile1.1"); - assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size()); - rq2.addLog("Queue2", "WALLogFile2.1"); - rq2.addLog("Queue2", "WALLogFile2.2"); - assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size()); - rq3.addLog("Queue1", "WALLogFile1.1"); - rq3.addLog("Queue3", "WALLogFile3.1"); - rq3.addLog("Queue3", "WALLogFile3.2"); - - // Test ReplicationQueueClient log tracking for faulty cases - assertEquals(0, ds0.getAbortCount()); - assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue")); - assertNull(rqc.getLogsInQueue(server1, "NotHereQueue")); - assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1")); - assertEquals(3, ds0.getAbortCount()); - // Test ReplicationQueueClient replicators - List replicators = rqc.getListOfReplicators(); - assertEquals(3, replicators.size()); - assertTrue(replicators.contains(server1)); - assertTrue(replicators.contains(server2)); - rq1.removeQueue("Queue1"); - assertEquals(2, rqc.getListOfReplicators().size()); - - // Test ReplicationQueuesClient queue tracking - assertEquals(0, rqc.getAllQueues(server1).size()); - rq1.addLog("Queue2", "WALLogFile2.1"); - rq1.addLog("Queue3", "WALLogFile3.1"); - assertEquals(2, rqc.getAllQueues(server1).size()); - rq1.removeAllQueues(); - assertEquals(0, rqc.getAllQueues(server1).size()); - - // Test ReplicationQueuesClient queue tracking for faulty cases - assertEquals(0, rqc.getAllQueues("NotHereServer").size()); - - // Test ReplicationQueuesClient get all WAL's - assertEquals(5 , rqc.getAllWALs().size()); - rq3.removeLog("Queue1", "WALLogFile1.1"); - assertEquals(4, rqc.getAllWALs().size()); - rq3.removeAllQueues(); - assertEquals(2, rqc.getAllWALs().size()); - rq2.removeAllQueues(); - assertEquals(0, rqc.getAllWALs().size()); - } - - @After - public void clearQueues() throws Exception{ - rq1.removeAllQueues(); - rq2.removeAllQueues(); - rq3.removeAllQueues(); - assertEquals(0, rq1.getAllQueues().size()); - assertEquals(0, rq2.getAllQueues().size()); - assertEquals(0, rq3.getAllQueues().size()); - ds0.resetAbortCount(); - ds1.resetAbortCount(); - ds2.resetAbortCount(); - ds3.resetAbortCount(); - } - - @After - public void tearDown() throws KeeperException, IOException { - ZKUtil.deleteNodeRecursively(zkw, replicationZNode); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - utility.shutdownMiniCluster(); - utility.shutdownMiniZKCluster(); - } - - static class DummyServer implements Server { - private String serverName; - private boolean isAborted = false; - private boolean isStopped = false; - private int abortCount = 0; - - public DummyServer(String serverName) { - this.serverName = serverName; - } - - @Override - public Configuration getConfiguration() { - return conf; - } - - @Override - public ZKWatcher getZooKeeper() { - return null; - } - - @Override - public CoordinatedStateManager getCoordinatedStateManager() { - return null; - } - - @Override - public ClusterConnection getConnection() { - return null; - } - - @Override - public MetaTableLocator getMetaTableLocator() { - return null; - } - - @Override - public ServerName getServerName() { - return ServerName.valueOf(this.serverName); - } - - @Override - public void abort(String why, Throwable e) { - abortCount++; - this.isAborted = true; - } - - @Override - public boolean isAborted() { - return this.isAborted; - } - - @Override - public void stop(String why) { - this.isStopped = true; - } - - @Override - public boolean isStopped() { - return this.isStopped; - } - - @Override - public ChoreService getChoreService() { - return null; - } - - @Override - public ClusterConnection getClusterConnection() { - return null; - } - - public int getAbortCount() { - return abortCount; - } - - public void resetAbortCount() { - abortCount = 0; - } - - @Override - public FileSystem getFileSystem() { - return null; - } - - @Override - public boolean isStopping() { - return false; - } - - @Override - public Connection createConnection(Configuration conf) throws IOException { - return null; - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java deleted file mode 100644 index 665eedb1dcb..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTableBase.java +++ /dev/null @@ -1,109 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -/** - * Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization - * should be non-blocking, but any method calls that access the table should be blocking. - */ -@Category({ReplicationTests.class, MediumTests.class}) -public class TestReplicationTableBase { - - private static long SLEEP_MILLIS = 5000; - private static long TIME_OUT_MILLIS = 3000; - private static Configuration conf; - private static HBaseTestingUtility utility; - private static ZKWatcher zkw; - private static ReplicationTableBase rb; - private static ReplicationQueues rq; - private static ReplicationQueuesClient rqc; - private volatile boolean asyncRequestSuccess = false; - - @Test - public void testSlowStartup() throws Exception{ - utility = new HBaseTestingUtility(); - utility.startMiniZKCluster(); - conf = utility.getConfiguration(); - conf.setClass("hbase.region.replica.replication.replicationQueues.class", - TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); - conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", - TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); - zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); - utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - rb = new ReplicationTableBase(conf, zkw) {}; - rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments( - conf, zkw, zkw)); - rqc = ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, zkw, zkw)); - return true; - } - @Override - public String explainFailure() throws Exception { - return "Failed to initialize ReplicationTableBase, TableBasedReplicationQueuesClient and " + - "TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS + - " ms. Their initialization " + "should be non-blocking"; - } - }); - final RequestReplicationQueueData async = new RequestReplicationQueueData(); - async.start(); - Thread.sleep(SLEEP_MILLIS); - // Test that the Replication Table has not been assigned and the methods are blocking - assertFalse(rb.getInitializationStatus()); - assertFalse(asyncRequestSuccess); - utility.startMiniCluster(); - // Test that the methods do return the correct results after getting the table - utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - async.join(); - return true; - } - @Override - public String explainFailure() throws Exception { - return "ReplicationQueue failed to return list of replicators even after Replication Table " - + "was initialized timeout=" + TIME_OUT_MILLIS + " ms"; - } - }); - assertTrue(asyncRequestSuccess); - } - - public class RequestReplicationQueueData extends Thread { - @Override - public void run() { - assertEquals(0, rq.getListOfReplicators().size()); - asyncRequestSuccess = true; - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java deleted file mode 100644 index 19457e2290f..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java +++ /dev/null @@ -1,63 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl; -import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -/** - * Tests the ReplicationSourceManager with TableBasedReplicationQueue's and - * TableBasedReplicationQueuesClient - */ -@Category({ReplicationTests.class, MediumTests.class}) -public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationSourceManager { - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf = HBaseConfiguration.create(); - conf.set("replication.replicationsource.implementation", - ReplicationSourceDummy.class.getCanonicalName()); - conf.setLong("replication.sleep.before.failover", 2000); - conf.setInt("replication.source.maxretriesmultiplier", 10); - - conf.setClass("hbase.region.replica.replication.replicationQueues.class", - TableBasedReplicationQueuesImpl.class, ReplicationQueues.class); - conf.setClass("hbase.region.replica.replication.replicationQueuesClient.class", - TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class); - utility = new HBaseTestingUtility(conf); - utility.startMiniCluster(); - Waiter.waitFor(conf, 3 * 1000, - () -> utility.getMiniHBaseCluster().getMaster().isInitialized()); - utility.waitUntilAllRegionsAssigned(TableName.valueOf( - NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication")); - setupZkAndReplication(); - } - -}