HBASE-15974 Create a ReplicationQueuesClientHBaseImpl

Building on HBase-15958.
Provided a ReplicationQueuesClientHBaseImpl that relies on the HBase Replication Table to track WAL queues.
Refactored out a large section of ReplicationQueuesHBaseImpl into a ReplicationTableClient class that handles Replication Table operations.

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Joseph Hwang 2016-06-09 16:16:38 -07:00 committed by Elliott Clark
parent ae5fe1e616
commit 2093aadec1
15 changed files with 759 additions and 375 deletions

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -122,7 +123,8 @@ public class ReplicationAdmin implements Closeable {
zkw = createZooKeeperWatcher();
try {
this.replicationQueuesClient =
ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
this.connection, zkw));
this.replicationQueuesClient.init();
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
this.replicationQueuesClient, this.connection);

View File

@ -38,9 +38,12 @@ public class ReplicationFactory {
return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
}
public static ReplicationQueuesClient getReplicationQueuesClient(final ZooKeeperWatcher zk,
Configuration conf, Abortable abortable) {
return new ReplicationQueuesClientZKImpl(zk, conf, abortable);
public static ReplicationQueuesClient getReplicationQueuesClient(
ReplicationQueuesClientArguments args)
throws Exception {
Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
"replication.ReplicationQueuesClientType", ReplicationQueuesClientZKImpl.class);
return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
}
public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@ -61,11 +62,12 @@ public interface ReplicationQueuesClient {
List<String> getAllQueues(String serverName) throws KeeperException;
/**
* Get the cversion of replication rs node. This can be used as optimistic locking to get a
* consistent snapshot of the replication queues.
* @return cversion of replication rs node
* Load all wals in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all WALs in the zookeeper at the start of this call even there
* is concurrent queue failover. However, some newly created WALs during the call may
* not be included.
*/
int getQueuesZNodeCversion() throws KeeperException;
Set<String> getAllWALs() throws KeeperException;
/**
* Get the change version number of replication hfile references node. This can be used as

View File

@ -0,0 +1,35 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@InterfaceAudience.Private
public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
ZooKeeperWatcher zk) {
super(conf, abort, zk);
}
public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) {
super(conf, abort);
}
}

View File

@ -19,7 +19,12 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@ -32,6 +37,12 @@ import org.apache.zookeeper.data.Stat;
public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
ReplicationQueuesClient {
Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class);
public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
this(args.getZk(), args.getConf(), args.getAbortable());
}
public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf,
Abortable abortable) {
super(zk, conf, abortable);
@ -74,7 +85,45 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
return result;
}
@Override public int getQueuesZNodeCversion() throws KeeperException {
@Override
public Set<String> getAllWALs() throws KeeperException {
/**
* Load all wals in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all WALs in the zookeeper at the start of this call even there
* is concurrent queue failover. However, some newly created WALs during the call may
* not be included.
*/
for (int retry = 0; ; retry++) {
int v0 = getQueuesZNodeCversion();
List<String> rss = getListOfReplicators();
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return ImmutableSet.of();
}
Set<String> wals = Sets.newHashSet();
for (String rs : rss) {
List<String> listOfPeers = getAllQueues(rs);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersWals = getLogsInQueue(rs, id);
if (peersWals != null) {
wals.addAll(peersWals);
}
}
}
int v1 = getQueuesZNodeCversion();
if (v0 == v1) {
return wals;
}
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
v0, v1, retry));
}
}
public int getQueuesZNodeCversion() throws KeeperException {
try {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);

View File

@ -0,0 +1,351 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/*
* Abstract class that provides an interface to the Replication Table. Which is currently
* being used for WAL offset tracking.
* The basic schema of this table will store each individual queue as a
* seperate row. The row key will be a unique identifier of the creating server's name and the
* queueId. Each queue must have the following two columns:
* COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue
* COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
* queue. The most recent previous owner is the leftmost entry.
* They will also have columns mapping [WAL filename : offset]
*/
@InterfaceAudience.Private
abstract class ReplicationTableBase {
/** Name of the HBase Table used for tracking replication*/
public static final TableName REPLICATION_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
// Column family and column names for Queues in the Replication Table
public static final byte[] CF_QUEUE = Bytes.toBytes("q");
public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
// Column Descriptor for the Replication Table
private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
.setInMemory(true)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// TODO: Figure out which bloom filter to use
.setBloomFilterType(BloomType.NONE);
// The value used to delimit the queueId and server name inside of a queue's row key. Currently a
// hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
// See HBASE-11394.
public static final String ROW_KEY_DELIMITER = "-";
// The value used to delimit server names in the queue history list
public static final String QUEUE_HISTORY_DELIMITER = "|";
/*
* Make sure that HBase table operations for replication have a high number of retries. This is
* because the server is aborted if any HBase table operation fails. Each RPC will be attempted
* 3600 times before exiting. This provides each operation with 2 hours of retries
* before the server is aborted.
*/
private static final int CLIENT_RETRIES = 3600;
private static final int RPC_TIMEOUT = 2000;
private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
protected final Table replicationTable;
protected final Configuration conf;
protected final Abortable abortable;
private final Admin admin;
private final Connection connection;
public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
this.conf = new Configuration(conf);
this.abortable = abort;
decorateConf();
this.connection = ConnectionFactory.createConnection(this.conf);
this.admin = connection.getAdmin();
this.replicationTable = createAndGetReplicationTable();
setTableTimeOuts();
}
/**
* Modify the connection's config so that operations run on the Replication Table have longer and
* a larger number of retries
*/
private void decorateConf() {
this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
}
/**
* Increases the RPC and operations timeouts for the Replication Table
*/
private void setTableTimeOuts() {
replicationTable.setRpcTimeout(RPC_TIMEOUT);
replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
}
/**
* Build the row key for the given queueId. This will uniquely identify it from all other queues
* in the cluster.
* @param serverName The owner of the queue
* @param queueId String identifier of the queue
* @return String representation of the queue's row key
*/
protected String buildQueueRowKey(String serverName, String queueId) {
return queueId + ROW_KEY_DELIMITER + serverName;
}
/**
* Parse the original queueId from a row key
* @param rowKey String representation of a queue's row key
* @return the original queueId
*/
protected String getRawQueueIdFromRowKey(String rowKey) {
return rowKey.split(ROW_KEY_DELIMITER)[0];
}
/**
* Returns a queue's row key given either its raw or reclaimed queueId
*
* @param queueId queueId of the queue
* @return byte representation of the queue's row key
*/
protected byte[] queueIdToRowKey(String serverName, String queueId) {
// Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
// then this is not a reclaimed queue.
if (!queueId.contains(ROW_KEY_DELIMITER)) {
return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
// If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
// queue's row key
} else {
return Bytes.toBytes(queueId);
}
}
/**
* Creates a "|" delimited record of the queue's past region server owners.
*
* @param originalHistory the queue's original owner history
* @param oldServer the name of the server that used to own the queue
* @return the queue's new owner history
*/
protected String buildClaimedQueueHistory(String originalHistory, String oldServer) {
return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
}
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
*/
protected List<String> getListOfReplicators() {
// scan all of the queues and return a list of all unique OWNER values
Set<String> peerServers = new HashSet<String>();
ResultScanner allQueuesInCluster = null;
try {
Scan scan = new Scan();
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
allQueuesInCluster = replicationTable.getScanner(scan);
for (Result queue : allQueuesInCluster) {
peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER)));
}
} catch (IOException e) {
String errMsg = "Failed getting list of replicators";
abortable.abort(errMsg, e);
} finally {
if (allQueuesInCluster != null) {
allQueuesInCluster.close();
}
}
return new ArrayList<String>(peerServers);
}
protected List<String> getAllQueues(String serverName) {
List<String> allQueues = new ArrayList<String>();
ResultScanner queueScanner = null;
try {
queueScanner = getQueuesBelongingToServer(serverName);
for (Result queue : queueScanner) {
String rowKey = Bytes.toString(queue.getRow());
// If the queue does not have a Owner History, then we must be its original owner. So we
// want to return its queueId in raw form
if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) {
allQueues.add(getRawQueueIdFromRowKey(rowKey));
} else {
allQueues.add(rowKey);
}
}
return allQueues;
} catch (IOException e) {
String errMsg = "Failed getting list of all replication queues for serverName=" + serverName;
abortable.abort(errMsg, e);
return null;
} finally {
if (queueScanner != null) {
queueScanner.close();
}
}
}
protected List<String> getLogsInQueue(String serverName, String queueId) {
String rowKey = queueId;
if (!queueId.contains(ROW_KEY_DELIMITER)) {
rowKey = buildQueueRowKey(serverName, queueId);
}
return getLogsInQueue(Bytes.toBytes(rowKey));
}
protected List<String> getLogsInQueue(byte[] rowKey) {
String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
try {
Get getQueue = new Get(rowKey);
Result queue = replicationTable.get(getQueue);
if (queue == null || queue.isEmpty()) {
abortable.abort(errMsg, new ReplicationException(errMsg));
return null;
}
return readWALsFromResult(queue);
} catch (IOException e) {
abortable.abort(errMsg, e);
return null;
}
}
/**
* Read all of the WAL's from a queue into a list
*
* @param queue HBase query result containing the queue
* @return a list of all the WAL filenames
*/
protected List<String> readWALsFromResult(Result queue) {
List<String> wals = new ArrayList<>();
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
for (byte[] cQualifier : familyMap.keySet()) {
// Ignore the meta data fields of the queue
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
COL_QUEUE_OWNER_HISTORY)) {
continue;
}
wals.add(Bytes.toString(cQualifier));
}
return wals;
}
/**
* Get the queue id's and meta data (Owner and History) for the queues belonging to the named
* server
*
* @param server name of the server
* @return a ResultScanner over the QueueIds belonging to the server
* @throws IOException
*/
private ResultScanner getQueuesBelongingToServer(String server) throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
ResultScanner results = replicationTable.getScanner(scan);
return results;
}
/**
* Gets the Replication Table. Builds and blocks until the table is available if the Replication
* Table does not exist.
*
* @return the Replication Table
* @throws IOException if the Replication Table takes too long to build
*/
private Table createAndGetReplicationTable() throws IOException {
if (!replicationTableExists()) {
createReplicationTable();
}
int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100);
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100);
RetryCounter retryCounter = counterFactory.create();
while (!replicationTableExists()) {
try {
retryCounter.sleepUntilNextRetry();
if (!retryCounter.shouldRetry()) {
throw new IOException("Unable to acquire the Replication Table");
}
} catch (InterruptedException e) {
return null;
}
}
return connection.getTable(REPLICATION_TABLE_NAME);
}
/**
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
* in TableBasedReplicationQueuesImpl
* @throws IOException
*/
private void createReplicationTable() throws IOException {
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
admin.createTable(replicationTableDescriptor);
}
/**
* Checks whether the Replication Table exists yet
*
* @return whether the Replication Table exists
* @throws IOException
*/
private boolean replicationTableExists() {
try {
return admin.tableExists(REPLICATION_TABLE_NAME);
} catch (IOException e) {
return false;
}
}
}

View File

@ -0,0 +1,111 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Implements the ReplicationQueuesClient interface on top of the Replication Table. It utilizes
* the ReplicationTableBase to access the Replication Table.
*/
@InterfaceAudience.Private
public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
implements ReplicationQueuesClient {
public TableBasedReplicationQueuesClientImpl(ReplicationQueuesClientArguments args)
throws IOException {
super(args.getConf(), args.getAbortable());
}
public TableBasedReplicationQueuesClientImpl(Configuration conf,
Abortable abortable) throws IOException {
super(conf, abortable);
}
@Override
public void init() throws ReplicationException{
// no-op
}
@Override
public List<String> getListOfReplicators() {
return super.getListOfReplicators();
}
@Override
public List<String> getLogsInQueue(String serverName, String queueId) {
return super.getLogsInQueue(serverName, queueId);
}
@Override
public List<String> getAllQueues(String serverName) {
return super.getAllQueues(serverName);
}
@Override
public Set<String> getAllWALs() {
Set<String> allWals = new HashSet<String>();
ResultScanner allQueues = null;
try {
allQueues = replicationTable.getScanner(new Scan());
for (Result queue : allQueues) {
for (String wal : readWALsFromResult(queue)) {
allWals.add(wal);
}
}
} catch (IOException e) {
String errMsg = "Failed getting all WAL's in Replication Table";
abortable.abort(errMsg, e);
} finally {
if (allQueues != null) {
allQueues.close();
}
}
return allWals;
}
@Override
public int getHFileRefsNodeChangeVersion() throws KeeperException {
// TODO
throw new NotImplementedException();
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
// TODO
throw new NotImplementedException();
}
@Override
public List<String> getReplicableHFiles(String peerId) throws KeeperException {
// TODO
throw new NotImplementedException();
}
}

View File

@ -16,24 +16,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@ -41,16 +33,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.io.IOException;
import java.util.ArrayList;
@ -63,91 +50,44 @@ import java.util.Set;
/**
* This class provides an implementation of the ReplicationQueues interface using an HBase table
* "Replication Table". The basic schema of this table will store each individual queue as a
* seperate row. The row key will be a unique identifier of the creating server's name and the
* queueId. Each queue must have the following two columns:
* COL_OWNER: tracks which server is currently responsible for tracking the queue
* COL_QUEUE_ID: tracks the queue's id as stored in ReplicationSource
* They will also have columns mapping [WAL filename : offset]
* One key difference from the ReplicationQueuesZkImpl is that when queues are reclaimed we
* simply return its HBase row key as its new "queueId"
* "Replication Table". It utilizes the ReplicationTableBase to access the Replication Table.
*/
@InterfaceAudience.Private
public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
implements ReplicationQueues {
public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
implements ReplicationQueues {
private static final Log LOG = LogFactory.getLog(ReplicationQueuesHBaseImpl.class);
/** Name of the HBase Table used for tracking replication*/
public static final TableName REPLICATION_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
// Column family and column names for the Replication Table
private static final byte[] CF = Bytes.toBytes("r");
private static final byte[] COL_OWNER = Bytes.toBytes("o");
private static final byte[] COL_OWNER_HISTORY = Bytes.toBytes("h");
// The value used to delimit the queueId and server name inside of a queue's row key. Currently a
// hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
// See HBASE-11394.
private static String ROW_KEY_DELIMITER = "-";
// Column Descriptor for the Replication Table
private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
new HColumnDescriptor(CF).setMaxVersions(1)
.setInMemory(true)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// TODO: Figure out which bloom filter to use
.setBloomFilterType(BloomType.NONE)
.setCacheDataInL1(true);
private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class);
// Common byte values used in replication offset tracking
private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
/*
* Make sure that HBase table operations for replication have a high number of retries. This is
* because the server is aborted if any HBase table operation fails. Each RPC will be attempted
* 3600 times before exiting. This provides each operation with 2 hours of retries
* before the server is aborted.
*/
private static final int CLIENT_RETRIES = 3600;
private static final int RPC_TIMEOUT = 2000;
private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
private Configuration modifiedConf;
private Admin admin;
private Connection connection;
private Table replicationTable;
private String serverName = null;
private byte[] serverNameBytes = null;
public ReplicationQueuesHBaseImpl(ReplicationQueuesArguments args) {
// TODO: Only use this variable temporarily. Eventually we want to use HBase to store all
// TODO: replication information
private ReplicationStateZKBase replicationState;
public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException {
this(args.getConf(), args.getAbortable(), args.getZk());
}
public ReplicationQueuesHBaseImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) {
super(zkw, conf, abort);
modifiedConf = new Configuration(conf);
modifiedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw)
throws IOException {
super(conf, abort);
replicationState = new ReplicationStateZKBase(zkw, conf, abort) {};
}
@Override
public void init(String serverName) throws ReplicationException {
try {
this.serverName = serverName;
this.serverNameBytes = Bytes.toBytes(serverName);
// Modify the connection's config so that the Replication Table it returns has a much higher
// number of client retries
this.connection = ConnectionFactory.createConnection(modifiedConf);
this.admin = connection.getAdmin();
replicationTable = createAndGetReplicationTable();
replicationTable.setRpcTimeout(RPC_TIMEOUT);
replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
} catch (IOException e) {
throw new ReplicationException(e);
}
this.serverName = serverName;
this.serverNameBytes = Bytes.toBytes(serverName);
}
@Override
public List<String> getListOfReplicators() {
return super.getListOfReplicators();
}
@Override
@ -169,14 +109,14 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
if (!checkQueueExists(queueId)) {
// Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
putNewQueue.addColumn(CF, COL_OWNER, serverNameBytes);
putNewQueue.addColumn(CF, COL_OWNER_HISTORY, EMPTY_STRING_BYTES);
putNewQueue.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, serverNameBytes);
putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES);
putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
replicationTable.put(putNewQueue);
} else {
// Otherwise simply add the new log and offset as a new column
Put putNewLog = new Put(queueIdToRowKey(queueId));
putNewLog.addColumn(CF, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
safeQueueUpdate(putNewLog);
}
} catch (IOException | ReplicationException e) {
@ -190,7 +130,7 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
try {
byte[] rowKey = queueIdToRowKey(queueId);
Delete delete = new Delete(rowKey);
delete.addColumns(CF, Bytes.toBytes(filename));
delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
safeQueueUpdate(delete);
} catch (IOException | ReplicationException e) {
String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename;
@ -204,7 +144,7 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
byte[] rowKey = queueIdToRowKey(queueId);
// Check that the log exists. addLog() must have been called before setLogPosition().
Get checkLogExists = new Get(rowKey);
checkLogExists.addColumn(CF, Bytes.toBytes(filename));
checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
if (!replicationTable.exists(checkLogExists)) {
String errMsg = "Could not set position of non-existent log from queueId=" + queueId +
", filename=" + filename;
@ -213,7 +153,7 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
}
// Update the log offset if it exists
Put walAndOffset = new Put(rowKey);
walAndOffset.addColumn(CF, Bytes.toBytes(filename), Bytes.toBytes(position));
walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position));
safeQueueUpdate(walAndOffset);
} catch (IOException | ReplicationException e) {
String errMsg = "Failed writing log position queueId=" + queueId + "filename=" +
@ -227,16 +167,16 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
try {
byte[] rowKey = queueIdToRowKey(queueId);
Get getOffset = new Get(rowKey);
getOffset.addColumn(CF, Bytes.toBytes(filename));
getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
Result result = getResultIfOwner(getOffset);
if (result == null || !result.containsColumn(CF, Bytes.toBytes(filename))) {
if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) {
throw new ReplicationException("Could not read empty result while getting log position " +
"queueId=" + queueId + ", filename=" + filename);
"queueId=" + queueId + ", filename=" + filename);
}
return Bytes.toLong(result.getValue(CF, Bytes.toBytes(filename)));
return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
} catch (IOException e) {
throw new ReplicationException("Could not get position in log for queueId=" + queueId +
", filename=" + filename);
", filename=" + filename);
}
}
@ -251,52 +191,12 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
@Override
public List<String> getLogsInQueue(String queueId) {
byte[] rowKey = queueIdToRowKey(queueId);
return getLogsInQueue(rowKey);
}
private List<String> getLogsInQueue(byte[] rowKey) {
String errMsg = "Could not get logs in queue queueId=" + Bytes.toString(rowKey);
try {
Get getQueue = new Get(rowKey);
Result queue = getResultIfOwner(getQueue);
// The returned queue could be null if we have lost ownership of it
if (queue == null) {
abortable.abort(errMsg, new ReplicationException(errMsg));
return null;
}
return readWALsFromResult(queue);
} catch (IOException e) {
abortable.abort(errMsg, e);
return null;
}
return getLogsInQueueAndCheckOwnership(rowKey);
}
@Override
public List<String> getAllQueues() {
List<String> allQueues = new ArrayList<String>();
ResultScanner queueScanner = null;
try {
queueScanner = this.getQueuesBelongingToServer(serverName);
for (Result queue : queueScanner) {
String rowKey = Bytes.toString(queue.getRow());
// If the queue does not have a Owner History, then we must be its original owner. So we
// want to return its queueId in raw form
if (Bytes.toString(queue.getValue(CF, COL_OWNER_HISTORY)).length() == 0) {
allQueues.add(getRawQueueIdFromRowKey(rowKey));
} else {
allQueues.add(rowKey);
}
}
return allQueues;
} catch (IOException e) {
String errMsg = "Failed getting list of all replication queues";
abortable.abort(errMsg, e);
return null;
} finally {
if (queueScanner != null) {
queueScanner.close();
}
}
return getAllQueues(serverName);
}
@Override
@ -307,12 +207,12 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
}
ResultScanner queuesToClaim = null;
try {
queuesToClaim = this.getQueuesBelongingToServer(regionserver);
queuesToClaim = getAllQueuesScanner(regionserver);
for (Result queue : queuesToClaim) {
if (attemptToClaimQueue(queue, regionserver)) {
String rowKey = Bytes.toString(queue.getRow());
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
if (peerExists(replicationQueueInfo.getPeerId())) {
if (replicationState.peerExists(replicationQueueInfo.getPeerId())) {
Set<String> sortedLogs = new HashSet<String>();
List<String> logs = getLogsInQueue(queue.getRow());
for (String log : logs) {
@ -324,7 +224,7 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
// Delete orphaned queues
removeQueue(Bytes.toString(queue.getRow()));
LOG.info(serverName + " has deleted abandoned queue " + rowKey + " from " +
regionserver);
regionserver);
}
}
}
@ -340,27 +240,22 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
return queues;
}
@Override
public List<String> getListOfReplicators() {
// scan all of the queues and return a list of all unique OWNER values
Set<String> peerServers = new HashSet<String>();
ResultScanner allQueuesInCluster = null;
try {
Scan scan = new Scan();
scan.addColumn(CF, COL_OWNER);
allQueuesInCluster = replicationTable.getScanner(scan);
for (Result queue : allQueuesInCluster) {
peerServers.add(Bytes.toString(queue.getValue(CF, COL_OWNER)));
}
} catch (IOException e) {
String errMsg = "Failed getting list of replicators";
abortable.abort(errMsg, e);
} finally {
if (allQueuesInCluster != null) {
allQueuesInCluster.close();
}
}
return new ArrayList<String>(peerServers);
/**
* Get the QueueIds belonging to the named server from the ReplicationTableBase
*
* @param server name of the server
* @return a ResultScanner over the QueueIds belonging to the server
* @throws IOException
*/
private ResultScanner getAllQueuesScanner(String server) throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
ResultScanner results = replicationTable.getScanner(scan);
return results;
}
@Override
@ -392,68 +287,31 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
throw new NotImplementedException();
}
/**
* Gets the Replication Table. Builds and blocks until the table is available if the Replication
* Table does not exist.
*
* @return the Replication Table
* @throws IOException if the Replication Table takes too long to build
*/
private Table createAndGetReplicationTable() throws IOException {
if (!replicationTableExists()) {
createReplicationTable();
}
int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100);
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100);
RetryCounter retryCounter = counterFactory.create();
while (!replicationTableExists()) {
try {
retryCounter.sleepUntilNextRetry();
if (!retryCounter.shouldRetry()) {
throw new IOException("Unable to acquire the Replication Table");
}
} catch (InterruptedException e) {
private List<String> getLogsInQueueAndCheckOwnership(byte[] rowKey) {
String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
List<String> logs = new ArrayList<String>();
try {
Get getQueue = new Get(rowKey);
Result queue = getResultIfOwner(getQueue);
if (queue == null || queue.isEmpty()) {
String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
return null;
}
}
return connection.getTable(REPLICATION_TABLE_NAME);
}
/**
* Checks whether the Replication Table exists yet
*
* @return whether the Replication Table exists
* @throws IOException
*/
private boolean replicationTableExists() {
try {
return admin.tableExists(REPLICATION_TABLE_NAME);
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
for(byte[] cQualifier : familyMap.keySet()) {
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
COL_QUEUE_OWNER_HISTORY)) {
continue;
}
logs.add(Bytes.toString(cQualifier));
}
} catch (IOException e) {
return false;
abortable.abort(errMsg, e);
return null;
}
}
/**
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
* in ReplicationQueuesHBaseImpl
*
* @throws IOException
*/
private void createReplicationTable() throws IOException {
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
admin.createTable(replicationTableDescriptor);
}
/**
* Build the row key for the given queueId. This will uniquely identify it from all other queues
* in the cluster.
* @param serverName The owner of the queue
* @param queueId String identifier of the queue
* @return String representation of the queue's row key
*/
private String buildQueueRowKey(String serverName, String queueId) {
return queueId + ROW_KEY_DELIMITER + serverName;
return logs;
}
private String buildQueueRowKey(String queueId) {
@ -461,12 +319,12 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
}
/**
* Parse the original queueId from a row key
* @param rowKey String representation of a queue's row key
* @return the original queueId
* Convenience method that gets the row key of the queue specified by queueId
* @param queueId queueId of a queue in this server
* @return the row key of the queue in the Replication Table
*/
private String getRawQueueIdFromRowKey(String rowKey) {
return rowKey.split(ROW_KEY_DELIMITER)[0];
private byte[] queueIdToRowKey(String queueId) {
return queueIdToRowKey(serverName, queueId);
}
/**
@ -486,7 +344,7 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
* @param delete Row mutation to perform on the queue
*/
private void safeQueueUpdate(Delete delete) throws ReplicationException,
IOException{
IOException{
RowMutations mutations = new RowMutations(delete.getRow());
mutations.add(delete);
safeQueueUpdate(mutations);
@ -500,50 +358,14 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
* @param mutate Mutation to perform on a given queue
*/
private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF, COL_OWNER,
CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE,
COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
if (!updateSuccess) {
throw new ReplicationException("Failed to update Replication Table because we lost queue " +
" ownership");
}
}
/**
* Returns a queue's row key given either its raw or reclaimed queueId
*
* @param queueId queueId of the queue
* @return byte representation of the queue's row key
*/
private byte[] queueIdToRowKey(String queueId) {
// Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
// then this is not a reclaimed queue.
if (!queueId.contains(ROW_KEY_DELIMITER)) {
return Bytes.toBytes(buildQueueRowKey(queueId));
// If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
// queue's row key
} else {
return Bytes.toBytes(queueId);
}
}
/**
* Get the QueueIds belonging to the named server from the ReplicationTable
*
* @param server name of the server
* @return a ResultScanner over the QueueIds belonging to the server
* @throws IOException
*/
private ResultScanner getQueuesBelongingToServer(String server) throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF, COL_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF, COL_OWNER);
scan.addColumn(CF, COL_OWNER_HISTORY);
ResultScanner results = replicationTable.getScanner(scan);
return results;
}
/**
* Check if the queue specified by queueId is stored in HBase
*
@ -556,25 +378,6 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
return replicationTable.exists(new Get(rowKey));
}
/**
* Read all of the WAL's from a queue into a list
*
* @param queue HBase query result containing the queue
* @return a list of all the WAL filenames
*/
private List<String> readWALsFromResult(Result queue) {
List<String> wals = new ArrayList<>();
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF);
for(byte[] cQualifier : familyMap.keySet()) {
// Ignore the meta data fields of the queue
if (Arrays.equals(cQualifier, COL_OWNER) || Arrays.equals(cQualifier, COL_OWNER_HISTORY)) {
continue;
}
wals.add(Bytes.toString(cQualifier));
}
return wals;
}
/**
* Attempt to claim the given queue with a checkAndPut on the OWNER column. We check that the
* recently killed server is still the OWNER before we claim it.
@ -586,37 +389,27 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
*/
private boolean attemptToClaimQueue (Result queue, String originalServer) throws IOException{
Put putQueueNameAndHistory = new Put(queue.getRow());
putQueueNameAndHistory.addColumn(CF, COL_OWNER, Bytes.toBytes(serverName));
String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF,
COL_OWNER_HISTORY)), originalServer);
putQueueNameAndHistory.addColumn(CF, COL_OWNER_HISTORY, Bytes.toBytes(newOwnerHistory));
putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(serverName));
String newOwnerHistory = buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE,
COL_QUEUE_OWNER_HISTORY)), originalServer);
putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY,
Bytes.toBytes(newOwnerHistory));
RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
claimAndRenameQueue.add(putQueueNameAndHistory);
// Attempt to claim ownership for this queue by checking if the current OWNER is the original
// server. If it is not then another RS has already claimed it. If it is we set ourselves as the
// new owner and update the queue's history
boolean success = replicationTable.checkAndMutate(queue.getRow(), CF, COL_OWNER,
boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue);
return success;
}
/**
* Creates a "|" delimited record of the queue's past region server owners.
*
* @param originalHistory the queue's original owner history
* @param oldServer the name of the server that used to own the queue
* @return the queue's new owner history
*/
private String buildClaimedQueueHistory(String originalHistory, String oldServer) {
return originalHistory + "|" + oldServer;
}
/**
* Attempts to run a Get on some queue. Will only return a non-null result if we currently own
* the queue.
*
* @param get The get that we want to query
* @return The result of the get if this server is the owner of the queue. Else it returns null
* @param get The Get that we want to query
* @return The result of the Get if this server is the owner of the queue. Else it returns null.
* @throws IOException
*/
private Result getResultIfOwner(Get get) throws IOException {
@ -624,10 +417,10 @@ public class ReplicationQueuesHBaseImpl extends ReplicationStateZKBase
// Check if the Get currently contains all columns or only specific columns
if (scan.getFamilyMap().size() > 0) {
// Add the OWNER column if the scan is already only over specific columns
scan.addColumn(CF, COL_OWNER);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
}
scan.setMaxResultSize(1);
SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF, COL_OWNER,
SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, serverNameBytes);
scan.setFilter(checkOwner);
ResultScanner scanner = null;

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -141,15 +142,16 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
super.setConf(conf);
try {
initReplicationQueuesClient(conf, zk);
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}
private void initReplicationQueuesClient(Configuration conf, ZooKeeperWatcher zk)
throws ZooKeeperConnectionException, IOException {
throws Exception {
this.zkw = zk;
this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new WarnOnlyAbortable());
this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
conf, new WarnOnlyAbortable(), zkw));
}
@Override

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import java.io.IOException;
import java.util.Collections;
@ -67,7 +68,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
try {
// The concurrently created new WALs may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
wals = loadWALsFromQueues();
wals = replicationQueues.getAllWALs();
} catch (KeeperException e) {
LOG.warn("Failed to read zookeeper, skipping checking deletable files");
return Collections.emptyList();
@ -88,43 +89,6 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
}});
}
/**
* Load all wals in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all WALs in the zookeeper at the start of this call even there
* is concurrent queue failover. However, some newly created WALs during the call may
* not be included.
*/
private Set<String> loadWALsFromQueues() throws KeeperException {
for (int retry = 0; ; retry++) {
int v0 = replicationQueues.getQueuesZNodeCversion();
List<String> rss = replicationQueues.getListOfReplicators();
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return ImmutableSet.of();
}
Set<String> wals = Sets.newHashSet();
for (String rs : rss) {
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
if (peersWals != null) {
wals.addAll(peersWals);
}
}
}
int v1 = replicationQueues.getQueuesZNodeCversion();
if (v0 == v1) {
return wals;
}
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
v0, v1, retry));
}
}
@Override
public void setConf(Configuration config) {
// If replication is disabled, keep all members null
@ -148,10 +112,10 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
super.setConf(conf);
try {
this.zkw = zk;
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf,
new WarnOnlyAbortable());
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw));
this.replicationQueues.init();
} catch (ReplicationException e) {
} catch (Exception e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
@ -67,13 +68,14 @@ public class ReplicationChecker {
try {
this.zkw = zkw;
this.errorReporter = errorReporter;
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, connection, zkw));
this.queuesClient.init();
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
connection);
this.replicationPeers.init();
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
} catch (ReplicationException e) {
} catch (Exception e) {
throw new IOException("failed to construct ReplicationChecker", e);
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -167,7 +168,7 @@ public class TestLogsCleaner {
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
cleaner.setConf(conf);
ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class);
ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class);
Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");

View File

@ -39,6 +39,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -58,15 +59,20 @@ public class TestReplicationStateHBaseImpl {
private static ReplicationQueues rq1;
private static ReplicationQueues rq2;
private static ReplicationQueues rq3;
private static ReplicationQueuesClient rqc;
private static ReplicationPeers rp;
private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 123L)
.toString();
private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
.toString();
private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
.toString();
private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L)
.toString();
private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L)
.toString();
private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
.toString();
private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
.toString();
private static DummyServer ds0;
private static DummyServer ds1;
private static DummyServer ds2;
private static DummyServer ds3;
@ -77,9 +83,9 @@ public class TestReplicationStateHBaseImpl {
utility.startMiniCluster();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
@ -88,6 +94,9 @@ public class TestReplicationStateHBaseImpl {
@Before
public void setUp() {
try {
ds0 = new DummyServer(server0);
rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
conf, ds0));
ds1 = new DummyServer(server1);
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq1.init(server1);
@ -99,9 +108,6 @@ public class TestReplicationStateHBaseImpl {
rq3.init(server3);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
rp.init();
rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
} catch (Exception e) {
fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage());
}
@ -165,13 +171,13 @@ public class TestReplicationStateHBaseImpl {
try {
rq1.getLogPosition("Queue1", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
"non-existent WAL");
"non-existent WAL");
} catch (ReplicationException e) {
}
try {
rq1.getLogPosition("NotHereQueue", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
"non-existent queue");
"non-existent queue");
} catch (ReplicationException e) {
}
// Test removing logs
@ -197,6 +203,13 @@ public class TestReplicationStateHBaseImpl {
@Test
public void TestMultipleReplicationQueuesHBaseImpl () {
try {
rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
} catch (ReplicationException e) {
fail("Failed to add peers to ReplicationPeers");
}
try {
// Test adding in WAL files
rq1.addLog("Queue1", "WALLogFile1.1");
@ -298,6 +311,56 @@ public class TestReplicationStateHBaseImpl {
}
}
@Test
public void TestReplicationQueuesClient() throws Exception{
// Test ReplicationQueuesClient log tracking
rq1.addLog("Queue1", "WALLogFile1.1");
assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size());
rq1.removeLog("Queue1", "WALLogFile1.1");
assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size());
rq2.addLog("Queue2", "WALLogFile2.1");
rq2.addLog("Queue2", "WALLogFile2.2");
assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size());
rq3.addLog("Queue1", "WALLogFile1.1");
rq3.addLog("Queue3", "WALLogFile3.1");
rq3.addLog("Queue3", "WALLogFile3.2");
// Test ReplicationQueueClient log tracking for faulty cases
assertEquals(0, ds0.getAbortCount());
assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue"));
assertNull(rqc.getLogsInQueue(server1, "NotHereQueue"));
assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1"));
assertEquals(3, ds0.getAbortCount());
// Test ReplicationQueueClient replicators
List<String> replicators = rqc.getListOfReplicators();
assertEquals(3, replicators.size());
assertTrue(replicators.contains(server1));
assertTrue(replicators.contains(server2));
rq1.removeQueue("Queue1");
assertEquals(2, rqc.getListOfReplicators().size());
// Test ReplicationQueuesClient queue tracking
assertEquals(0, rqc.getAllQueues(server1).size());
rq1.addLog("Queue2", "WALLogFile2.1");
rq1.addLog("Queue3", "WALLogFile3.1");
assertEquals(2, rqc.getAllQueues(server1).size());
rq1.removeAllQueues();
assertEquals(0, rqc.getAllQueues(server1).size());
// Test ReplicationQueuesClient queue tracking for faulty cases
assertEquals(0, rqc.getAllQueues("NotHereServer").size());
// Test ReplicationQueuesClient get all WAL's
assertEquals(5 , rqc.getAllWALs().size());
rq3.removeLog("Queue1", "WALLogFile1.1");
assertEquals(4, rqc.getAllWALs().size());
rq3.removeAllQueues();
assertEquals(2, rqc.getAllWALs().size());
rq2.removeAllQueues();
assertEquals(0, rqc.getAllWALs().size());
}
@After
public void clearQueues() throws Exception{
rq1.removeAllQueues();
@ -306,6 +369,7 @@ public class TestReplicationStateHBaseImpl {
assertEquals(0, rq1.getAllQueues().size());
assertEquals(0, rq2.getAllQueues().size());
assertEquals(0, rq3.getAllQueues().size());
ds0.resetAbortCount();
ds1.resetAbortCount();
ds2.resetAbortCount();
ds3.resetAbortCount();
@ -313,7 +377,7 @@ public class TestReplicationStateHBaseImpl {
@After
public void tearDown() throws KeeperException, IOException {
ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
}
@AfterClass

View File

@ -96,11 +96,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
rqc = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, ds1, zkw));
} catch (Exception e) {
// This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl
// This should not occur, because getReplicationQueues() only throws for
// TableBasedReplicationQueuesImpl
fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
}
rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);

View File

@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@ -436,8 +438,9 @@ public class TestReplicationSourceManager {
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationQueuesClient client =
ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
ReplicationQueuesClientZKImpl client =
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
int v0 = client.getQueuesZNodeCversion();
rq1.claimQueues(s0.getServerName().getServerName());