HBASE-15883 Adding WAL files and tracking offsets in HBase.

Implemented ReplicationQueuesHBaseImpl that tracks WAL offsets and replication queues in an HBase table.
Only wrote the basic tracking methods, have not implemented claimQueue() or HFileRef methods yet.
Wrote a basic unit test for ReplicationQueueHBaseImpl that tests the implemented functions on a single Region Server

Signed-off-by: Elliott Clark <elliott@fb.com>
Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Joseph Hwang 2016-05-19 17:14:33 -07:00 committed by Elliott Clark
parent 9a53d8b385
commit 21e98271c3
15 changed files with 871 additions and 43 deletions

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.lang.reflect.ConstructorUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@ -30,9 +31,11 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@InterfaceAudience.Private
public class ReplicationFactory {
public static ReplicationQueues getReplicationQueues(final ZooKeeperWatcher zk,
Configuration conf, Abortable abortable) {
return new ReplicationQueuesZKImpl(zk, conf, abortable);
public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
throws Exception {
Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
"replication.ReplicationQueuesType", ReplicationQueuesZKImpl.class);
return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
}
public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk,

View File

@ -83,13 +83,13 @@ public interface ReplicationQueues {
/**
* Get a list of all WALs in the given queue.
* @param queueId a String that identifies the queue
* @return a list of WALs, null if this region server is dead and has no outstanding queues
* @return a list of WALs, null if no such queue exists for this server
*/
List<String> getLogsInQueue(String queueId);
/**
* Get a list of all queues for this region server.
* @return a list of queueIds, null if this region server is dead and has no outstanding queues
* @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
*/
List<String> getAllQueues();
@ -110,10 +110,10 @@ public interface ReplicationQueues {
/**
* Checks if the provided znode is the same as this region server's
* @param znode to check
* @param regionserver the id of the region server
* @return if this is this rs's znode
*/
boolean isThisOurZnode(String znode);
boolean isThisOurRegionServer(String regionserver);
/**
* Add a peer to hfile reference queue if peer does not exist.

View File

@ -0,0 +1,66 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@InterfaceAudience.Private
public class ReplicationQueuesArguments {
private ZooKeeperWatcher zk;
private Configuration conf;
private Abortable abort;
public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
this.conf = conf;
this.abort = abort;
}
public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) {
this(conf, abort);
setZk(zk);
}
public ZooKeeperWatcher getZk() {
return zk;
}
public void setZk(ZooKeeperWatcher zk) {
this.zk = zk;
}
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Abortable getAbort() {
return abort;
}
public void setAbort(Abortable abort) {
this.abort = abort;
}
}

View File

@ -0,0 +1,491 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
@InterfaceAudience.Private
public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
/** Name of the HBase Table used for tracking replication*/
public static final TableName REPLICATION_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
// Column family and column names for the Replication Table
private static final byte[] CF = Bytes.toBytes("r");
private static final byte[] COL_OWNER = Bytes.toBytes("o");
private static final byte[] COL_QUEUE_ID = Bytes.toBytes("q");
// Column Descriptor for the Replication Table
private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
new HColumnDescriptor(CF).setMaxVersions(1)
.setInMemory(true)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// TODO: Figure out which bloom filter to use
.setBloomFilterType(BloomType.NONE)
.setCacheDataInL1(true);
// Common byte values used in replication offset tracking
private static final byte[] INITIAL_OFFSET = Bytes.toBytes(0L);
/*
* Make sure that HBase table operations for replication have a high number of retries. This is
* because the server is aborted if any HBase table operation fails. Each RPC will be attempted
* 3600 times before exiting. This provides each operation with 2 hours of retries
* before the server is aborted.
*/
private static final int CLIENT_RETRIES = 3600;
private static final int RPC_TIMEOUT = 2000;
private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
private final Configuration conf;
private final Admin admin;
private final Connection connection;
private final Table replicationTable;
private final Abortable abortable;
private String serverName = null;
private byte[] serverNameBytes = null;
public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) throws IOException {
this(args.getConf(), args.getAbort());
}
public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort) throws IOException {
this.conf = new Configuration(conf);
// Modify the connection's config so that the Replication Table it returns has a much higher
// number of client retries
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
this.connection = ConnectionFactory.createConnection(conf);
this.admin = connection.getAdmin();
this.abortable = abort;
replicationTable = createAndGetReplicationTable();
replicationTable.setRpcTimeout(RPC_TIMEOUT);
replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
}
@Override
public void init(String serverName) throws ReplicationException {
this.serverName = serverName;
this.serverNameBytes = Bytes.toBytes(serverName);
}
@Override
public void removeQueue(String queueId) {
try {
byte[] rowKey = this.queueIdToRowKey(queueId);
// The rowkey will be null if the queue cannot be found in the Replication Table
if (rowKey == null) {
String errMsg = "Could not remove non-existent queue with queueId=" + queueId;
abortable.abort(errMsg, new ReplicationException(errMsg));
return;
}
Delete deleteQueue = new Delete(rowKey);
safeQueueUpdate(deleteQueue);
} catch (IOException e) {
abortable.abort("Could not remove queue with queueId=" + queueId, e);
}
}
@Override
public void addLog(String queueId, String filename) throws ReplicationException {
try {
// Check if the queue info (Owner, QueueId) is currently stored in the Replication Table
if (this.queueIdToRowKey(queueId) == null) {
// Each queue will have an Owner, QueueId, and a collection of [WAL:offset] key values.
Put putNewQueue = new Put(Bytes.toBytes(buildServerQueueName(queueId)));
putNewQueue.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName));
putNewQueue.addColumn(CF, COL_QUEUE_ID, Bytes.toBytes(queueId));
putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET);
replicationTable.put(putNewQueue);
} else {
// Otherwise simply add the new log and offset as a new column
Put putNewLog = new Put(this.queueIdToRowKey(queueId));
putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET);
safeQueueUpdate(putNewLog);
}
} catch (IOException e) {
abortable.abort("Could not add queue queueId=" + queueId + " filename=" + filename, e);
}
}
@Override
public void removeLog(String queueId, String filename) {
try {
byte[] rowKey = this.queueIdToRowKey(queueId);
if (rowKey == null) {
String errMsg = "Could not remove log from non-existent queueId=" + queueId + ", filename="
+ filename;
abortable.abort(errMsg, new ReplicationException(errMsg));
return;
}
Delete delete = new Delete(rowKey);
delete.addColumns(CF, Bytes.toBytes(filename));
safeQueueUpdate(delete);
} catch (IOException e) {
abortable.abort("Could not remove log from queueId=" + queueId + ", filename=" + filename, e);
}
}
@Override
public void setLogPosition(String queueId, String filename, long position) {
try {
byte[] rowKey = this.queueIdToRowKey(queueId);
if (rowKey == null) {
String errMsg = "Could not set position of log from non-existent queueId=" + queueId +
", filename=" + filename;
abortable.abort(errMsg, new ReplicationException(errMsg));
return;
}
// Check that the log exists. addLog() must have been called before setLogPosition().
Get checkLogExists = new Get(rowKey);
checkLogExists.addColumn(CF, Bytes.toBytes(filename));
if (!replicationTable.exists(checkLogExists)) {
String errMsg = "Could not set position of non-existent log from queueId=" + queueId +
", filename=" + filename;
abortable.abort(errMsg, new ReplicationException(errMsg));
return;
}
// Update the log offset if it exists
Put walAndOffset = new Put(rowKey);
walAndOffset.addColumn(CF, Bytes.toBytes(filename), Bytes.toBytes(position));
safeQueueUpdate(walAndOffset);
} catch (IOException e) {
abortable.abort("Failed to write replication wal position (filename=" + filename +
", position=" + position + ")", e);
}
}
@Override
public long getLogPosition(String queueId, String filename) throws ReplicationException {
try {
byte[] rowKey = this.queueIdToRowKey(queueId);
if (rowKey == null) {
throw new ReplicationException("Could not get position in log for non-existent queue " +
"queueId=" + queueId + ", filename=" + filename);
}
Get getOffset = new Get(rowKey);
getOffset.addColumn(CF, Bytes.toBytes(filename));
Result result = replicationTable.get(getOffset);
if (result.isEmpty()) {
throw new ReplicationException("Could not read empty result while getting log position " +
"queueId=" + queueId + ", filename=" + filename);
}
return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename)));
} catch (IOException e) {
throw new ReplicationException("Could not get position in log for queueId=" + queueId +
", filename=" + filename);
}
}
@Override
public void removeAllQueues() {
List<String> myQueueIds = getAllQueues();
for (String queueId : myQueueIds) {
removeQueue(queueId);
}
}
@Override
public List<String> getLogsInQueue(String queueId) {
List<String> logs = new ArrayList<String>();
try {
byte[] rowKey = this.queueIdToRowKey(queueId);
if (rowKey == null) {
String errMsg = "Could not get logs from non-existent queueId=" + queueId;
abortable.abort(errMsg, new ReplicationException(errMsg));
return null;
}
Get getQueue = new Get(rowKey);
Result queue = replicationTable.get(getQueue);
if (queue.isEmpty()) {
return null;
}
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF);
for (byte[] cQualifier : familyMap.keySet()) {
if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_QUEUE_ID)) {
continue;
}
logs.add(Bytes.toString(cQualifier));
}
} catch (IOException e) {
abortable.abort("Could not get logs from queue queueId=" + queueId, e);
return null;
}
return logs;
}
@Override
public List<String> getAllQueues() {
try {
return this.getQueuesBelongingToServer(serverName);
} catch (IOException e) {
abortable.abort("Could not get all replication queues", e);
return null;
}
}
@Override
public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
// TODO
throw new NotImplementedException();
}
@Override
public List<String> getListOfReplicators() {
// TODO
throw new NotImplementedException();
}
@Override
public boolean isThisOurRegionServer(String regionserver) {
return this.serverName.equals(regionserver);
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
// TODO
throw new NotImplementedException();
}
@Override
public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
// TODO
throw new NotImplementedException();
}
@Override
public void removeHFileRefs(String peerId, List<String> files) {
// TODO
throw new NotImplementedException();
}
/**
* Gets the Replication Table. Builds and blocks until the table is available if the Replication
* Table does not exist.
*
* @return the Replication Table
* @throws IOException if the Replication Table takes too long to build
*/
private Table createAndGetReplicationTable() throws IOException {
if (!replicationTableExists()) {
createReplicationTable();
}
int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100);
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100);
RetryCounter retryCounter = counterFactory.create();
while (!replicationTableExists()) {
try {
retryCounter.sleepUntilNextRetry();
if (!retryCounter.shouldRetry()) {
throw new IOException("Unable to acquire the Replication Table");
}
} catch (InterruptedException e) {
return null;
}
}
return connection.getTable(REPLICATION_TABLE_NAME);
}
/**
* Checks whether the Replication Table exists yet
*
* @return whether the Replication Table exists
* @throws IOException
*/
private boolean replicationTableExists() {
try {
return admin.tableExists(REPLICATION_TABLE_NAME);
} catch (IOException e) {
return false;
}
}
/**
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
* in ReplicationQueuesHBaseImpl
* @throws IOException
*/
private void createReplicationTable() throws IOException {
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
admin.createTable(replicationTableDescriptor);
}
/**
* Builds the unique identifier for a queue in the Replication table by appending the queueId to
* the servername
*
* @param queueId a String that identifies the queue
* @return unique identifier for a queue in the Replication table
*/
private String buildServerQueueName(String queueId) {
return serverName + "-" + queueId;
}
/**
* See safeQueueUpdate(RowMutations mutate)
* @param put Row mutation to perform on the queue
*/
private void safeQueueUpdate(Put put) {
RowMutations mutations = new RowMutations(put.getRow());
try {
mutations.add(put);
} catch (IOException e){
abortable.abort("Failed to update Replication Table because of IOException", e);
}
safeQueueUpdate(mutations);
}
/**
* See safeQueueUpdate(RowMutations mutate)
* @param delete Row mutation to perform on the queue
*/
private void safeQueueUpdate(Delete delete) {
RowMutations mutations = new RowMutations(delete.getRow());
try {
mutations.add(delete);
} catch (IOException e) {
abortable.abort("Failed to update Replication Table because of IOException", e);
}
safeQueueUpdate(mutations);
}
/**
* Attempt to mutate a given queue in the Replication Table with a checkAndPut on the OWNER column
* of the queue. Abort the server if this checkAndPut fails: which means we have somehow lost
* ownership of the column or an IO Exception has occurred during the transaction.
*
* @param mutate Mutation to perform on a given queue
*/
private void safeQueueUpdate(RowMutations mutate) {
try {
boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER,
CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
if (!updateSuccess) {
String errMsg = "Failed to update Replication Table because we lost queue ownership";
abortable.abort(errMsg, new ReplicationException(errMsg));
}
} catch (IOException e) {
abortable.abort("Failed to update Replication Table because of IOException", e);
}
}
/**
* Get the QueueIds belonging to the named server from the ReplicationTable
*
* @param server name of the server
* @return a list of the QueueIds belonging to the server
* @throws IOException
*/
private List<String> getQueuesBelongingToServer(String server) throws IOException {
List<String> queues = new ArrayList<String>();
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, COL_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF, COL_QUEUE_ID);
scan.addColumn(CF, COL_OWNER);
ResultScanner results = replicationTable.getScanner(scan);
for (Result result : results) {
queues.add(Bytes.toString(result.getValue(CF, COL_QUEUE_ID)));
}
results.close();
return queues;
}
/**
* Finds the row key of the HBase row corresponding to the provided queue. This has to be done,
* because the row key is [original server name + "-" + queueId0]. And the original server will
* make calls to getLog(), getQueue(), etc. with the argument queueId = queueId0.
* On the original server we can build the row key by concatenating servername + queueId0.
* Yet if the queue is claimed by another server, future calls to getLog(), getQueue(), etc.
* will be made with the argument queueId = queueId0 + "-" + pastOwner0 + "-" + pastOwner1 ...
* so we need a way to look up rows by their modified queueId's.
*
* TODO: Consider updating the queueId passed to getLog, getQueue()... inside of ReplicationSource
* TODO: and ReplicationSourceManager or the parsing of the passed in queueId's so that we don't
* TODO have to scan the table for row keys for each update. See HBASE-15956.
*
* TODO: We can also cache queueId's if ReplicationQueuesHBaseImpl becomes a bottleneck. We
* TODO: currently perform scan's over all the rows looking for one with a matching QueueId.
*
* @param queueId string representation of the queue id
* @return the rowkey of the corresponding queue. This returns null if the corresponding queue
* cannot be found.
* @throws IOException
*/
private byte[] queueIdToRowKey(String queueId) throws IOException {
Scan scan = new Scan();
scan.addColumn(CF, COL_QUEUE_ID);
scan.addColumn(CF, COL_OWNER);
scan.setMaxResultSize(1);
// Search for the queue that matches this queueId
SingleColumnValueFilter filterByQueueId = new SingleColumnValueFilter(CF, COL_QUEUE_ID,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(queueId));
// Make sure that we are the owners of the queue. QueueId's may overlap.
SingleColumnValueFilter filterByOwner = new SingleColumnValueFilter(CF, COL_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(serverName));
// We only want the row key
FirstKeyOnlyFilter filterOutColumns = new FirstKeyOnlyFilter();
FilterList filterList = new FilterList(filterByQueueId, filterByOwner, filterOutColumns);
scan.setFilter(filterList);
ResultScanner results = replicationTable.getScanner(scan);
Result result = results.next();
results.close();
return (result == null) ? null : result.getRow();
}
}

View File

@ -41,7 +41,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* This class provides an implementation of the ReplicationQueues interface using ZooKeeper. The
* This class provides an implementation of the
* interface using ZooKeeper. The
* base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
* all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
* the regionserver name (a concatenation of the region servers hostname, client port and start
@ -71,6 +72,10 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
this(args.getZk(), args.getConf(), args.getAbort());
}
public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
Abortable abortable) {
super(zk, conf, abortable);
@ -166,8 +171,8 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
@Override
public boolean isThisOurZnode(String znode) {
return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
public boolean isThisOurRegionServer(String regionserver) {
return ZKUtil.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
}
@Override
@ -223,7 +228,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
this.abortable.abort("Failed to get a list of queues for region server: "
+ this.myQueuesZnode, e);
}
return listOfQueues;
return listOfQueues == null ? new ArrayList<String>() : listOfQueues;
}
/**

View File

@ -48,16 +48,17 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;
@ -127,7 +128,8 @@ public class Replication extends WALActionsListener.Base implements
if (replication) {
try {
this.replicationQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
server.getZooKeeper()));
this.replicationQueues.init(this.server.getServerName().toString());
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
@ -135,7 +137,7 @@ public class Replication extends WALActionsListener.Base implements
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
this.conf, this.server, this.server);
} catch (ReplicationException e) {
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}
UUID clusterId = null;

View File

@ -315,9 +315,6 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public void join() {
this.executor.shutdown();
if (this.sources.size() == 0) {
this.replicationQueues.removeAllQueues();
}
for (ReplicationSourceInterface source : this.sources) {
source.terminate("Region server is closing");
}
@ -624,7 +621,7 @@ public class ReplicationSourceManager implements ReplicationListener {
@Override
public void run() {
if (this.rq.isThisOurZnode(rsZnode)) {
if (this.rq.isThisOurRegionServer(rsZnode)) {
return;
}
// Wait a bit before transferring the queues, we may be shutting down.

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -160,7 +161,7 @@ public class TestReplicationAdmin {
Configuration conf = TEST_UTIL.getConfiguration();
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test HBaseAdmin", null);
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(zkw, conf, null);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw));
repQueues.init("server1");
// add queue for ID_ONE

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -94,7 +95,7 @@ public class TestLogsCleaner {
Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer();
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
repQueues.init(server.getServerName().toString());
final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
@ -87,8 +88,7 @@ public class TestReplicationHFileCleaner {
Replication.decorateMasterConfiguration(conf);
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
rp.init();
rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
try {
fs = FileSystem.get(conf);

View File

@ -121,7 +121,7 @@ public abstract class TestReplicationStateBasic {
rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues();
assertNull(rq1.getAllQueues());
assertEquals(0, rq1.getAllQueues().size());
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
assertNull(rq1.getLogsInQueue("bogus"));
assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());

View File

@ -0,0 +1,243 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static junit.framework.TestCase.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationStateHBaseImpl {
private static Configuration conf;
private static HBaseTestingUtility utility;
private static Connection connection;
private static ReplicationQueues rqH;
private final String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniCluster();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
connection = ConnectionFactory.createConnection(conf);
}
@Test
public void checkNamingSchema() throws Exception {
rqH.init(server1);
assertTrue(rqH.isThisOurRegionServer(server1));
assertTrue(!rqH.isThisOurRegionServer(server1 + "a"));
assertTrue(!rqH.isThisOurRegionServer(null));
}
@Test
public void testReplicationStateHBase() {
DummyServer ds = new DummyServer(server1);
try {
rqH = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds, null));
rqH.init(server1);
// Check that the proper System Tables have been generated
Table replicationTable = connection.getTable(
ReplicationQueuesHBaseImpl.REPLICATION_TABLE_NAME);
assertTrue(replicationTable.getName().isSystemTable());
} catch (Exception e) {
e.printStackTrace();
fail("testReplicationStateHBaseConstruction received an Exception");
}
try {
// Test adding in WAL files
assertEquals(0, rqH.getAllQueues().size());
rqH.addLog("Queue1", "WALLogFile1.1");
assertEquals(1, rqH.getAllQueues().size());
rqH.addLog("Queue1", "WALLogFile1.2");
rqH.addLog("Queue1", "WALLogFile1.3");
rqH.addLog("Queue1", "WALLogFile1.4");
rqH.addLog("Queue2", "WALLogFile2.1");
rqH.addLog("Queue3", "WALLogFile3.1");
assertEquals(3, rqH.getAllQueues().size());
assertEquals(4, rqH.getLogsInQueue("Queue1").size());
assertEquals(1, rqH.getLogsInQueue("Queue2").size());
assertEquals(1, rqH.getLogsInQueue("Queue3").size());
// Make sure that abortCount is still 0
assertEquals(0, ds.getAbortCount());
// Make sure that getting a log from a non-existent queue triggers an abort
assertNull(rqH.getLogsInQueue("Queue4"));
assertEquals(1, ds.getAbortCount());
} catch (ReplicationException e) {
e.printStackTrace();
fail("testAddLog received a ReplicationException");
}
try {
// Test updating the log positions
assertEquals(0L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
rqH.setLogPosition("Queue1", "WALLogFile1.1", 123L);
assertEquals(123L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
rqH.setLogPosition("Queue1", "WALLogFile1.1", 123456789L);
assertEquals(123456789L, rqH.getLogPosition("Queue1", "WALLogFile1.1"));
rqH.setLogPosition("Queue2", "WALLogFile2.1", 242L);
assertEquals(242L, rqH.getLogPosition("Queue2", "WALLogFile2.1"));
rqH.setLogPosition("Queue3", "WALLogFile3.1", 243L);
assertEquals(243L, rqH.getLogPosition("Queue3", "WALLogFile3.1"));
// Test that setting log positions in non-existing logs will cause an abort
assertEquals(1, ds.getAbortCount());
rqH.setLogPosition("NotHereQueue", "WALLogFile3.1", 243L);
assertEquals(2, ds.getAbortCount());
rqH.setLogPosition("NotHereQueue", "NotHereFile", 243L);
assertEquals(3, ds.getAbortCount());
rqH.setLogPosition("Queue1", "NotHereFile", 243l);
assertEquals(4, ds.getAbortCount());
// Test reading log positions for non-existent queues and WAL's
try {
rqH.getLogPosition("Queue1", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
"non-existent WAL");
} catch (ReplicationException e) {
}
try {
rqH.getLogPosition("NotHereQueue", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
"non-existent queue");
} catch (ReplicationException e) {
}
// Test removing logs
rqH.removeLog("Queue1", "WALLogFile1.1");
assertEquals(3, rqH.getLogsInQueue("Queue1").size());
// Test removing queues
rqH.removeQueue("Queue2");
assertEquals(2, rqH.getAllQueues().size());
assertNull(rqH.getLogsInQueue("Queue2"));
// Test that getting logs from a non-existent queue aborts
assertEquals(5, ds.getAbortCount());
// Test removing all queues for a Region Server
rqH.removeAllQueues();
assertEquals(0, rqH.getAllQueues().size());
assertNull(rqH.getLogsInQueue("Queue1"));
// Test that getting logs from a non-existent queue aborts
assertEquals(6, ds.getAbortCount());
} catch (ReplicationException e) {
e.printStackTrace();
fail("testAddLog received a ReplicationException");
}
}
static class DummyServer implements Server {
private String serverName;
private boolean isAborted = false;
private boolean isStopped = false;
private int abortCount = 0;
public DummyServer(String serverName) {
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(this.serverName);
}
@Override
public void abort(String why, Throwable e) {
abortCount++;
this.isAborted = true;
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isStopped = true;
}
@Override
public boolean isStopped() {
return this.isStopped;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
return null;
}
public int getAbortCount() {
return abortCount;
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -91,9 +93,14 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
DummyServer ds1 = new DummyServer(server1);
DummyServer ds2 = new DummyServer(server2);
DummyServer ds3 = new DummyServer(server3);
rq1 = ReplicationFactory.getReplicationQueues(zkw, conf, ds1);
rq2 = ReplicationFactory.getReplicationQueues(zkw, conf, ds2);
rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
try {
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
} catch (Exception e) {
// This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl
fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
}
rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
@ -284,9 +285,11 @@ public class TestReplicationSourceManager {
LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("hostname0.example.org");
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
server.getZooKeeper()));
rq.init(server.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
@ -326,8 +329,8 @@ public class TestReplicationSourceManager {
public void testCleanupFailoverQueues() throws Exception {
final Server server = new DummyServer("hostname1.example.org");
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
server.getZooKeeper()));
rq.init(server.getServerName().toString());
// populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<String>();
@ -341,7 +344,8 @@ public class TestReplicationSourceManager {
}
Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
@ -365,7 +369,8 @@ public class TestReplicationSourceManager {
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
server.getZooKeeper()));
repQueues.init(server.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
@ -381,16 +386,19 @@ public class TestReplicationSourceManager {
// simulate three servers fail sequentially
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
SortedMap<String, SortedSet<String>> testMap =
rq1.claimQueues(server.getServerName().getServerName());
ReplicationQueues rq2 =
ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
s2.getZooKeeper()));
rq2.init(s2.getServerName().toString());
testMap = rq2.claimQueues(s1.getServerName().getServerName());
ReplicationQueues rq3 =
ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
s3.getZooKeeper()));
rq3.init(s3.getServerName().toString());
testMap = rq3.claimQueues(s2.getServerName().getServerName());
@ -412,7 +420,8 @@ public class TestReplicationSourceManager {
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
s0.getZooKeeper()));
repQueues.init(s0.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
@ -423,7 +432,8 @@ public class TestReplicationSourceManager {
// simulate queue transfer
Server s1 = new DummyServer("cversion-change1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationQueuesClient client =
@ -522,8 +532,8 @@ public class TestReplicationSourceManager {
this.deadRsZnode = znode;
this.server = s;
this.rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
server.getZooKeeper()));
this.rq.init(this.server.getServerName().toString());
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
@ -1543,7 +1544,8 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
// create replicator
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "Test Hbase Fsck", connection);
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(zkw, conf, connection);
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, connection,
zkw));
repQueues.init("server1");
// queues for current peer, no errors
repQueues.addLog("1", "file1");