From 21e98271c32f0d44106515a72b2c92d518c03668 Mon Sep 17 00:00:00 2001 From: Joseph Hwang Date: Thu, 19 May 2016 17:14:33 -0700 Subject: [PATCH] HBASE-15883 Adding WAL files and tracking offsets in HBase. Implemented ReplicationQueuesHBaseImpl that tracks WAL offsets and replication queues in an HBase table. Only wrote the basic tracking methods, have not implemented claimQueue() or HFileRef methods yet. Wrote a basic unit test for ReplicationQueueHBaseImpl that tests the implemented functions on a single Region Server Signed-off-by: Elliott Clark Signed-off-by: Elliott Clark --- .../hbase/replication/ReplicationFactory.java | 11 +- .../hbase/replication/ReplicationQueues.java | 8 +- .../ReplicationQueuesArguments.java | 66 +++ .../ReplicationQueuesHBaseImpl.java | 491 ++++++++++++++++++ .../replication/ReplicationQueuesZKImpl.java | 13 +- .../replication/regionserver/Replication.java | 12 +- .../ReplicationSourceManager.java | 5 +- .../replication/TestReplicationAdmin.java | 3 +- .../hbase/master/cleaner/TestLogsCleaner.java | 3 +- .../cleaner/TestReplicationHFileCleaner.java | 4 +- .../TestReplicationStateBasic.java | 2 +- .../TestReplicationStateHBaseImpl.java | 243 +++++++++ .../TestReplicationStateZKImpl.java | 13 +- .../TestReplicationSourceManager.java | 36 +- .../hadoop/hbase/util/TestHBaseFsckOneRS.java | 4 +- 15 files changed, 871 insertions(+), 43 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 91e77ca7e44..e264a4d1f85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication; +import org.apache.commons.lang.reflect.ConstructorUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -30,9 +31,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @InterfaceAudience.Private public class ReplicationFactory { - public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk, - Configuration conf, Abortable abortable) { - return new ReplicationQueuesZKImpl(zk, conf, abortable); + public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) + throws Exception { + Class classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.ReplicationQueuesType", ReplicationQueuesZKImpl.class); + return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); } public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk, @@ -44,7 +47,7 @@ public class ReplicationFactory { Abortable abortable) { return getReplicationPeers(zk, conf, null, abortable); } - + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, final ReplicationQueuesClient queuesClient, Abortable abortable) { return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0d47a88cb4a..db6da9127a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -83,13 +83,13 @@ public interface ReplicationQueues { /** * Get a list of all WALs in the given queue. * @param queueId a String that identifies the queue - * @return a list of WALs, null if this region server is dead and has no outstanding queues + * @return a list of WALs, null if no such queue exists for this server */ List getLogsInQueue(String queueId); /** * Get a list of all queues for this region server. - * @return a list of queueIds, null if this region server is dead and has no outstanding queues + * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues */ List getAllQueues(); @@ -110,10 +110,10 @@ public interface ReplicationQueues { /** * Checks if the provided znode is the same as this region server's - * @param znode to check + * @param regionserver the id of the region server * @return if this is this rs's znode */ - boolean isThisOurZnode(String znode); + boolean isThisOurRegionServer(String regionserver); /** * Add a peer to hfile reference queue if peer does not exist. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java new file mode 100644 index 00000000000..4907b73d085 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +@InterfaceAudience.Private +public class ReplicationQueuesArguments { + + private ZooKeeperWatcher zk; + private Configuration conf; + private Abortable abort; + + public ReplicationQueuesArguments(Configuration conf, Abortable abort) { + this.conf = conf; + this.abort = abort; + } + + public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) { + this(conf, abort); + setZk(zk); + } + + public ZooKeeperWatcher getZk() { + return zk; + } + + public void setZk(ZooKeeperWatcher zk) { + this.zk = zk; + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Abortable getAbort() { + return abort; + } + + public void setAbort(Abortable abort) { + this.abort = abort; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java new file mode 100644 index 00000000000..bbc9e3267b6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java @@ -0,0 +1,491 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.SortedSet; + +@InterfaceAudience.Private +public class ReplicationQueuesHBaseImpl implements ReplicationQueues { + + /** Name of the HBase Table used for tracking replication*/ + public static final TableName REPLICATION_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Column family and column names for the Replication Table + private static final byte[] CF = Bytes.toBytes("r"); + private static final byte[] COL_OWNER = Bytes.toBytes("o"); + private static final byte[] COL_QUEUE_ID = Bytes.toBytes("q"); + + // Column Descriptor for the Replication Table + private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR = + new HColumnDescriptor(CF).setMaxVersions(1) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // TODO: Figure out which bloom filter to use + .setBloomFilterType(BloomType.NONE) + .setCacheDataInL1(true); + + // Common byte values used in replication offset tracking + private static final byte[] INITIAL_OFFSET = Bytes.toBytes(0L); + + /* + * Make sure that HBase table operations for replication have a high number of retries. This is + * because the server is aborted if any HBase table operation fails. Each RPC will be attempted + * 3600 times before exiting. This provides each operation with 2 hours of retries + * before the server is aborted. + */ + private static final int CLIENT_RETRIES = 3600; + private static final int RPC_TIMEOUT = 2000; + private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT; + + private final Configuration conf; + private final Admin admin; + private final Connection connection; + private final Table replicationTable; + private final Abortable abortable; + private String serverName = null; + private byte[] serverNameBytes = null; + + public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) throws IOException { + this(args.getConf(), args.getAbort()); + } + + public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort) throws IOException { + this.conf = new Configuration(conf); + // Modify the connection's config so that the Replication Table it returns has a much higher + // number of client retries + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES); + this.connection = ConnectionFactory.createConnection(conf); + this.admin = connection.getAdmin(); + this.abortable = abort; + replicationTable = createAndGetReplicationTable(); + replicationTable.setRpcTimeout(RPC_TIMEOUT); + replicationTable.setOperationTimeout(OPERATION_TIMEOUT); + } + + @Override + public void init(String serverName) throws ReplicationException { + this.serverName = serverName; + this.serverNameBytes = Bytes.toBytes(serverName); + } + + @Override + public void removeQueue(String queueId) { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + // The rowkey will be null if the queue cannot be found in the Replication Table + if (rowKey == null) { + String errMsg = "Could not remove non-existent queue with queueId=" + queueId; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + Delete deleteQueue = new Delete(rowKey); + safeQueueUpdate(deleteQueue); + } catch (IOException e) { + abortable.abort("Could not remove queue with queueId=" + queueId, e); + } + } + + @Override + public void addLog(String queueId, String filename) throws ReplicationException { + try { + // Check if the queue info (Owner, QueueId) is currently stored in the Replication Table + if (this.queueIdToRowKey(queueId) == null) { + // Each queue will have an Owner, QueueId, and a collection of [WAL:offset] key values. + Put putNewQueue = new Put(Bytes.toBytes(buildServerQueueName(queueId))); + putNewQueue.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName)); + putNewQueue.addColumn(CF, COL_QUEUE_ID, Bytes.toBytes(queueId)); + putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET); + replicationTable.put(putNewQueue); + } else { + // Otherwise simply add the new log and offset as a new column + Put putNewLog = new Put(this.queueIdToRowKey(queueId)); + putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET); + safeQueueUpdate(putNewLog); + } + } catch (IOException e) { + abortable.abort("Could not add queue queueId=" + queueId + " filename=" + filename, e); + } + } + + @Override + public void removeLog(String queueId, String filename) { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + String errMsg = "Could not remove log from non-existent queueId=" + queueId + ", filename=" + + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + Delete delete = new Delete(rowKey); + delete.addColumns(CF, Bytes.toBytes(filename)); + safeQueueUpdate(delete); + } catch (IOException e) { + abortable.abort("Could not remove log from queueId=" + queueId + ", filename=" + filename, e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + String errMsg = "Could not set position of log from non-existent queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Check that the log exists. addLog() must have been called before setLogPosition(). + Get checkLogExists = new Get(rowKey); + checkLogExists.addColumn(CF, Bytes.toBytes(filename)); + if (!replicationTable.exists(checkLogExists)) { + String errMsg = "Could not set position of non-existent log from queueId=" + queueId + + ", filename=" + filename; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return; + } + // Update the log offset if it exists + Put walAndOffset = new Put(rowKey); + walAndOffset.addColumn(CF, Bytes.toBytes(filename), Bytes.toBytes(position)); + safeQueueUpdate(walAndOffset); + } catch (IOException e) { + abortable.abort("Failed to write replication wal position (filename=" + filename + + ", position=" + position + ")", e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws ReplicationException { + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + throw new ReplicationException("Could not get position in log for non-existent queue " + + "queueId=" + queueId + ", filename=" + filename); + } + Get getOffset = new Get(rowKey); + getOffset.addColumn(CF, Bytes.toBytes(filename)); + Result result = replicationTable.get(getOffset); + if (result.isEmpty()) { + throw new ReplicationException("Could not read empty result while getting log position " + + "queueId=" + queueId + ", filename=" + filename); + } + return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename))); + } catch (IOException e) { + throw new ReplicationException("Could not get position in log for queueId=" + queueId + + ", filename=" + filename); + } + } + + @Override + public void removeAllQueues() { + List myQueueIds = getAllQueues(); + for (String queueId : myQueueIds) { + removeQueue(queueId); + } + } + + @Override + public List getLogsInQueue(String queueId) { + List logs = new ArrayList(); + try { + byte[] rowKey = this.queueIdToRowKey(queueId); + if (rowKey == null) { + String errMsg = "Could not get logs from non-existent queueId=" + queueId; + abortable.abort(errMsg, new ReplicationException(errMsg)); + return null; + } + Get getQueue = new Get(rowKey); + Result queue = replicationTable.get(getQueue); + if (queue.isEmpty()) { + return null; + } + Map familyMap = queue.getFamilyMap(CF); + for (byte[] cQualifier : familyMap.keySet()) { + if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_QUEUE_ID)) { + continue; + } + logs.add(Bytes.toString(cQualifier)); + } + } catch (IOException e) { + abortable.abort("Could not get logs from queue queueId=" + queueId, e); + return null; + } + return logs; + } + + @Override + public List getAllQueues() { + try { + return this.getQueuesBelongingToServer(serverName); + } catch (IOException e) { + abortable.abort("Could not get all replication queues", e); + return null; + } + } + + @Override + public SortedMap> claimQueues(String regionserver) { + // TODO + throw new NotImplementedException(); + } + + @Override + public List getListOfReplicators() { + // TODO + throw new NotImplementedException(); + } + + @Override + public boolean isThisOurRegionServer(String regionserver) { + return this.serverName.equals(regionserver); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void addHFileRefs(String peerId, List files) throws ReplicationException { + // TODO + throw new NotImplementedException(); + } + + @Override + public void removeHFileRefs(String peerId, List files) { + // TODO + throw new NotImplementedException(); + } + + /** + * Gets the Replication Table. Builds and blocks until the table is available if the Replication + * Table does not exist. + * + * @return the Replication Table + * @throws IOException if the Replication Table takes too long to build + */ + private Table createAndGetReplicationTable() throws IOException { + if (!replicationTableExists()) { + createReplicationTable(); + } + int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100); + RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100); + RetryCounter retryCounter = counterFactory.create(); + while (!replicationTableExists()) { + try { + retryCounter.sleepUntilNextRetry(); + if (!retryCounter.shouldRetry()) { + throw new IOException("Unable to acquire the Replication Table"); + } + } catch (InterruptedException e) { + return null; + } + } + return connection.getTable(REPLICATION_TABLE_NAME); + } + + /** + * Checks whether the Replication Table exists yet + * + * @return whether the Replication Table exists + * @throws IOException + */ + private boolean replicationTableExists() { + try { + return admin.tableExists(REPLICATION_TABLE_NAME); + } catch (IOException e) { + return false; + } + } + + /** + * Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR + * in ReplicationQueuesHBaseImpl + * @throws IOException + */ + private void createReplicationTable() throws IOException { + HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME); + replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR); + admin.createTable(replicationTableDescriptor); + } + + /** + * Builds the unique identifier for a queue in the Replication table by appending the queueId to + * the servername + * + * @param queueId a String that identifies the queue + * @return unique identifier for a queue in the Replication table + */ + private String buildServerQueueName(String queueId) { + return serverName + "-" + queueId; + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * @param put Row mutation to perform on the queue + */ + private void safeQueueUpdate(Put put) { + RowMutations mutations = new RowMutations(put.getRow()); + try { + mutations.add(put); + } catch (IOException e){ + abortable.abort("Failed to update Replication Table because of IOException", e); + } + safeQueueUpdate(mutations); + } + + /** + * See safeQueueUpdate(RowMutations mutate) + * @param delete Row mutation to perform on the queue + */ + private void safeQueueUpdate(Delete delete) { + RowMutations mutations = new RowMutations(delete.getRow()); + try { + mutations.add(delete); + } catch (IOException e) { + abortable.abort("Failed to update Replication Table because of IOException", e); + } + safeQueueUpdate(mutations); + } + + /** + * Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column + * of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost + * ownership of the column or an IO Exception has occurred during the transaction. + * + * @param mutate Mutation to perform on a given queue + */ + private void safeQueueUpdate(RowMutations mutate) { + try { + boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER, + CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate); + if (!updateSuccess) { + String errMsg = "Failed to update Replication Table because we lost queue ownership"; + abortable.abort(errMsg, new ReplicationException(errMsg)); + } + } catch (IOException e) { + abortable.abort("Failed to update Replication Table because of IOException", e); + } + } + + /** + * Get the QueueIds belonging to the named server from the ReplicationTable + * + * @param server name of the server + * @return a list of the QueueIds belonging to the server + * @throws IOException + */ + private List getQueuesBelongingToServer(String server) throws IOException { + List queues = new ArrayList(); + Scan scan = new Scan(); + SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, COL_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server)); + scan.setFilter(filterMyQueues); + scan.addColumn(CF, COL_QUEUE_ID); + scan.addColumn(CF, COL_OWNER); + ResultScanner results = replicationTable.getScanner(scan); + for (Result result : results) { + queues.add(Bytes.toString(result.getValue(CF, COL_QUEUE_ID))); + } + results.close(); + return queues; + } + + /** + * Finds the row key of the HBase row corresponding to the provided queue. This has to be done, + * because the row key is [original server name + "-" + queueId0]. And the original server will + * make calls to getLog(), getQueue(), etc. with the argument queueId = queueId0. + * On the original server we can build the row key by concatenating servername + queueId0. + * Yet if the queue is claimed by another server, future calls to getLog(), getQueue(), etc. + * will be made with the argument queueId = queueId0 + "-" + pastOwner0 + "-" + pastOwner1 ... + * so we need a way to look up rows by their modified queueId's. + * + * TODO: Consider updating the queueId passed to getLog, getQueue()... inside of ReplicationSource + * TODO: and ReplicationSourceManager or the parsing of the passed in queueId's so that we don't + * TODO have to scan the table for row keys for each update. See HBASE-15956. + * + * TODO: We can also cache queueId's if ReplicationQueuesHBaseImpl becomes a bottleneck. We + * TODO: currently perform scan's over all the rows looking for one with a matching QueueId. + * + * @param queueId string representation of the queue id + * @return the rowkey of the corresponding queue. This returns null if the corresponding queue + * cannot be found. + * @throws IOException + */ + private byte[] queueIdToRowKey(String queueId) throws IOException { + Scan scan = new Scan(); + scan.addColumn(CF, COL_QUEUE_ID); + scan.addColumn(CF, COL_OWNER); + scan.setMaxResultSize(1); + // Search for the queue that matches this queueId + SingleColumnValueFilter filterByQueueId = new SingleColumnValueFilter(CF, COL_QUEUE_ID, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(queueId)); + // Make sure that we are the owners of the queue. QueueId's may overlap. + SingleColumnValueFilter filterByOwner = new SingleColumnValueFilter(CF, COL_OWNER, + CompareFilter.CompareOp.EQUAL, Bytes.toBytes(serverName)); + // We only want the row key + FirstKeyOnlyFilter filterOutColumns = new FirstKeyOnlyFilter(); + FilterList filterList = new FilterList(filterByQueueId, filterByOwner, filterOutColumns); + scan.setFilter(filterList); + ResultScanner results = replicationTable.getScanner(scan); + Result result = results.next(); + results.close(); + return (result == null) ? null : result.getRow(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 2bb8ea82448..32d0883dbe6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -41,7 +41,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; /** - * This class provides an implementation of the ReplicationQueues interface using ZooKeeper. The + * This class provides an implementation of the + * interface using ZooKeeper. The * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is * the regionserver name (a concatenation of the region server’s hostname, client port and start @@ -71,6 +72,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); + public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { + this(args.getZk(), args.getConf(), args.getAbort()); + } + public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); @@ -166,8 +171,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public boolean isThisOurZnode(String znode) { - return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode); + public boolean isThisOurRegionServer(String regionserver) { + return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode); } @Override @@ -223,7 +228,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R this.abortable.abort("Failed to get a list of queues for region server: " + this.myQueuesZnode, e); } - return listOfQueues; + return listOfQueues == null ? new ArrayList() : listOfQueues; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index fa5e2220e3c..d55472d31da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -48,16 +48,17 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; @@ -127,7 +128,8 @@ public class Replication extends WALActionsListener.Base implements if (replication) { try { this.replicationQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server, + server.getZooKeeper())); this.replicationQueues.init(this.server.getServerName().toString()); this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); @@ -135,7 +137,7 @@ public class Replication extends WALActionsListener.Base implements this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server); - } catch (ReplicationException e) { + } catch (Exception e) { throw new IOException("Failed replication handler create", e); } UUID clusterId = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b585513468b..ed2eeccfe7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -315,9 +315,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void join() { this.executor.shutdown(); - if (this.sources.size() == 0) { - this.replicationQueues.removeAllQueues(); - } for (ReplicationSourceInterface source : this.sources) { source.terminate("Region server is closing"); } @@ -624,7 +621,7 @@ public class ReplicationSourceManager implements ReplicationListener { @Override public void run() { - if (this.rq.isThisOurZnode(rsZnode)) { + if (this.rq.isThisOurRegionServer(rsZnode)) { return; } // Wait a bit before transferring the queues, we may be shutting down. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index c3241c961df..06a3c7ee53d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -160,7 +161,7 @@ public class TestReplicationAdmin { Configuration conf = TEST_UTIL.getConfiguration(); ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(zkw, conf, null); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); repQueues.init("server1"); // add queue for ID_ONE diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 47db32b0c16..18950a24d92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -94,7 +95,7 @@ public class TestLogsCleaner { Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); repQueues.init(server.getServerName().toString()); final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index d4f23c8a70f..1778e73395f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; @@ -87,8 +88,7 @@ public class TestReplicationHFileCleaner { Replication.decorateMasterConfiguration(conf); rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server); rp.init(); - - rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper())); rq.init(server.getServerName().toString()); try { fs = FileSystem.get(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 144046f40a8..5ab26ab1549 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -121,7 +121,7 @@ public abstract class TestReplicationStateBasic { rq1.removeQueue("bogus"); rq1.removeLog("bogus", "bogus"); rq1.removeAllQueues(); - assertNull(rq1.getAllQueues()); + assertEquals(0, rq1.getAllQueues().size()); assertEquals(0, rq1.getLogPosition("bogus", "bogus")); assertNull(rq1.getLogsInQueue("bogus")); assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java new file mode 100644 index 00000000000..81862135832 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static junit.framework.TestCase.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({ReplicationTests.class, MediumTests.class}) +public class TestReplicationStateHBaseImpl { + + private static Configuration conf; + private static HBaseTestingUtility utility; + private static Connection connection; + private static ReplicationQueues rqH; + + private final String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + conf = utility.getConfiguration(); + conf.setClass("hbase.region.replica.replication.ReplicationQueuesType", + ReplicationQueuesHBaseImpl.class, ReplicationQueues.class); + connection = ConnectionFactory.createConnection(conf); + } + + @Test + public void checkNamingSchema() throws Exception { + rqH.init(server1); + assertTrue(rqH.isThisOurRegionServer(server1)); + assertTrue(!rqH.isThisOurRegionServer(server1 + "a")); + assertTrue(!rqH.isThisOurRegionServer(null)); + } + + @Test + public void testReplicationStateHBase() { + DummyServer ds = new DummyServer(server1); + try { + rqH = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds, null)); + rqH.init(server1); + // Check that the proper System Tables have been generated + Table replicationTable = connection.getTable( + ReplicationQueuesHBaseImpl.REPLICATION_TABLE_NAME); + assertTrue(replicationTable.getName().isSystemTable()); + + } catch (Exception e) { + e.printStackTrace(); + fail("testReplicationStateHBaseConstruction received an Exception"); + } + try { + // Test adding in WAL files + assertEquals(0, rqH.getAllQueues().size()); + rqH.addLog("Queue1", "WALLogFile1.1"); + assertEquals(1, rqH.getAllQueues().size()); + rqH.addLog("Queue1", "WALLogFile1.2"); + rqH.addLog("Queue1", "WALLogFile1.3"); + rqH.addLog("Queue1", "WALLogFile1.4"); + rqH.addLog("Queue2", "WALLogFile2.1"); + rqH.addLog("Queue3", "WALLogFile3.1"); + assertEquals(3, rqH.getAllQueues().size()); + assertEquals(4, rqH.getLogsInQueue("Queue1").size()); + assertEquals(1, rqH.getLogsInQueue("Queue2").size()); + assertEquals(1, rqH.getLogsInQueue("Queue3").size()); + // Make sure that abortCount is still 0 + assertEquals(0, ds.getAbortCount()); + // Make sure that getting a log from a non-existent queue triggers an abort + assertNull(rqH.getLogsInQueue("Queue4")); + assertEquals(1, ds.getAbortCount()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLog received a ReplicationException"); + } + try { + + // Test updating the log positions + assertEquals(0L, rqH.getLogPosition("Queue1", "WALLogFile1.1")); + rqH.setLogPosition("Queue1", "WALLogFile1.1", 123L); + assertEquals(123L, rqH.getLogPosition("Queue1", "WALLogFile1.1")); + rqH.setLogPosition("Queue1", "WALLogFile1.1", 123456789L); + assertEquals(123456789L, rqH.getLogPosition("Queue1", "WALLogFile1.1")); + rqH.setLogPosition("Queue2", "WALLogFile2.1", 242L); + assertEquals(242L, rqH.getLogPosition("Queue2", "WALLogFile2.1")); + rqH.setLogPosition("Queue3", "WALLogFile3.1", 243L); + assertEquals(243L, rqH.getLogPosition("Queue3", "WALLogFile3.1")); + + // Test that setting log positions in non-existing logs will cause an abort + assertEquals(1, ds.getAbortCount()); + rqH.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L); + assertEquals(2, ds.getAbortCount()); + rqH.setLogPosition("NotHereQueue", "NotHereFile", 243L); + assertEquals(3, ds.getAbortCount()); + rqH.setLogPosition("Queue1", "NotHereFile", 243l); + assertEquals(4, ds.getAbortCount()); + + // Test reading log positions for non-existent queues and WAL's + try { + rqH.getLogPosition("Queue1", "NotHereWAL"); + fail("Replication queue should have thrown a ReplicationException for reading from a " + + "non-existent WAL"); + } catch (ReplicationException e) { + } + try { + rqH.getLogPosition("NotHereQueue", "NotHereWAL"); + fail("Replication queue should have thrown a ReplicationException for reading from a " + + "non-existent queue"); + } catch (ReplicationException e) { + } + // Test removing logs + rqH.removeLog("Queue1", "WALLogFile1.1"); + assertEquals(3, rqH.getLogsInQueue("Queue1").size()); + // Test removing queues + rqH.removeQueue("Queue2"); + assertEquals(2, rqH.getAllQueues().size()); + assertNull(rqH.getLogsInQueue("Queue2")); + // Test that getting logs from a non-existent queue aborts + assertEquals(5, ds.getAbortCount()); + // Test removing all queues for a Region Server + rqH.removeAllQueues(); + assertEquals(0, rqH.getAllQueues().size()); + assertNull(rqH.getLogsInQueue("Queue1")); + // Test that getting logs from a non-existent queue aborts + assertEquals(6, ds.getAbortCount()); + } catch (ReplicationException e) { + e.printStackTrace(); + fail("testAddLog received a ReplicationException"); + } + } + + static class DummyServer implements Server { + private String serverName; + private boolean isAborted = false; + private boolean isStopped = false; + private int abortCount = 0; + + public DummyServer(String serverName) { + this.serverName = serverName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ClusterConnection getConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf(this.serverName); + } + + @Override + public void abort(String why, Throwable e) { + abortCount++; + this.isAborted = true; + } + + @Override + public boolean isAborted() { + return this.isAborted; + } + + @Override + public void stop(String why) { + this.isStopped = true; + } + + @Override + public boolean isStopped() { + return this.isStopped; + } + + @Override + public ChoreService getChoreService() { + return null; + } + + @Override + public ClusterConnection getClusterConnection() { + return null; + } + + public int getAbortCount() { + return abortCount; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index 94dbb2523b2..e7311351519 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -91,9 +93,14 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { DummyServer ds1 = new DummyServer(server1); DummyServer ds2 = new DummyServer(server2); DummyServer ds3 = new DummyServer(server3); - rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1); - rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2); - rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3); + try { + rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw)); + rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw)); + rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw)); + } catch (Exception e) { + // This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl + fail("ReplicationFactory.getReplicationQueues() threw an IO Exception"); + } rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 9e950d2bec4..d1db06803fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; @@ -284,9 +285,11 @@ public class TestReplicationSourceManager { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); + + ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -326,8 +329,8 @@ public class TestReplicationSourceManager { public void testCleanupFailoverQueues() throws Exception { final Server server = new DummyServer("hostname1.example.org"); ReplicationQueues rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); rq.init(server.getServerName().toString()); // populate some znodes in the peer znode SortedSet files = new TreeSet(); @@ -341,7 +344,8 @@ public class TestReplicationSourceManager { } Server s1 = new DummyServer("dummyserver1.example.org"); ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); ReplicationPeers rp1 = ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1); @@ -365,7 +369,8 @@ public class TestReplicationSourceManager { conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, + server.getZooKeeper())); repQueues.init(server.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -381,16 +386,19 @@ public class TestReplicationSourceManager { // simulate three servers fail sequentially ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); SortedMap> testMap = rq1.claimQueues(server.getServerName().getServerName()); ReplicationQueues rq2 = - ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2, + s2.getZooKeeper())); rq2.init(s2.getServerName().toString()); testMap = rq2.claimQueues(s1.getServerName().getServerName()); ReplicationQueues rq3 = - ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3, + s3.getZooKeeper())); rq3.init(s3.getServerName().toString()); testMap = rq3.claimQueues(s2.getServerName().getServerName()); @@ -412,7 +420,8 @@ public class TestReplicationSourceManager { conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server s0 = new DummyServer("cversion-change0.example.org"); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0, + s0.getZooKeeper())); repQueues.init(s0.getServerName().toString()); // populate some znodes in the peer znode files.add("log1"); @@ -423,7 +432,8 @@ public class TestReplicationSourceManager { // simulate queue transfer Server s1 = new DummyServer("cversion-change1.example.org"); ReplicationQueues rq1 = - ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1, + s1.getZooKeeper())); rq1.init(s1.getServerName().toString()); ReplicationQueuesClient client = @@ -522,8 +532,8 @@ public class TestReplicationSourceManager { this.deadRsZnode = znode; this.server = s; this.rq = - ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(), - server); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server, + server.getZooKeeper())); this.rq.init(this.server.getServerName().toString()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java index 2140b391c34..84ef6dad354 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckOneRS.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; @@ -1543,7 +1544,8 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck { // create replicator ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection); ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(zkw, conf, connection); + ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, connection, + zkw)); repQueues.init("server1"); // queues for current peer, no errors repQueues.addLog("1", "file1");