HBASE-16036 Made Replication Table creation non-blocking.

All ReplicationTableBase method's that need to access the Replication Table will block until it is created though.
Also refactored ReplicationSourceManager so that abandoned queue adoption is run in the background too so that it does not block HRegionServer initialization.

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Joseph Hwang 2016-06-15 14:35:56 -07:00 committed by Elliott Clark
parent b006e41a37
commit 152594560e
11 changed files with 581 additions and 253 deletions

View File

@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
* ReplicationQueues Implementations with different constructor arguments by reflection.
*/
@InterfaceAudience.Private
public class ReplicationQueuesArguments {

View File

@ -23,6 +23,11 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
* various ReplicationQueuesClient Implementations with different constructor arguments by
* reflection.
*/
@InterfaceAudience.Private
public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,

View File

@ -18,12 +18,14 @@
*/
package org.apache.hadoop.hbase.replication;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
@ -42,12 +44,18 @@ import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/*
* Abstract class that provides an interface to the Replication Table. Which is currently
@ -59,8 +67,10 @@ import java.util.Set;
* COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
* queue. The most recent previous owner is the leftmost entry.
* They will also have columns mapping [WAL filename : offset]
* The most flexible method of interacting with the Replication Table is by calling
* getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
* to the caller to close the returned table.
*/
@InterfaceAudience.Private
abstract class ReplicationTableBase {
@ -99,20 +109,23 @@ abstract class ReplicationTableBase {
private static final int RPC_TIMEOUT = 2000;
private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
protected final Table replicationTable;
// We only need a single thread to initialize the Replication Table
private static final int NUM_INITIALIZE_WORKERS = 1;
protected final Configuration conf;
protected final Abortable abortable;
private final Admin admin;
private final Connection connection;
private final Executor executor;
private volatile CountDownLatch replicationTableInitialized;
public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
this.conf = new Configuration(conf);
this.abortable = abort;
decorateConf();
this.connection = ConnectionFactory.createConnection(this.conf);
this.admin = connection.getAdmin();
this.replicationTable = createAndGetReplicationTable();
setTableTimeOuts();
this.executor = setUpExecutor();
this.replicationTableInitialized = new CountDownLatch(1);
createReplicationTableInBackground();
}
/**
@ -123,12 +136,35 @@ abstract class ReplicationTableBase {
this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
}
/**
* Sets up the thread pool executor used to build the Replication Table in the background
* @return the configured executor
*/
private Executor setUpExecutor() {
ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationTableExecutor-%d");
tfb.setDaemon(true);
tempExecutor.setThreadFactory(tfb.build());
return tempExecutor;
}
/**
* Get whether the Replication Table has been successfully initialized yet
* @return whether the Replication Table is initialized
*/
public boolean getInitializationStatus() {
return replicationTableInitialized.getCount() == 0;
}
/**
* Increases the RPC and operations timeouts for the Replication Table
*/
private void setTableTimeOuts() {
private Table setReplicationTableTimeOuts(Table replicationTable) {
replicationTable.setRpcTimeout(RPC_TIMEOUT);
replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
return replicationTable;
}
/**
@ -189,7 +225,7 @@ abstract class ReplicationTableBase {
// scan all of the queues and return a list of all unique OWNER values
Set<String> peerServers = new HashSet<String>();
ResultScanner allQueuesInCluster = null;
try {
try (Table replicationTable = getOrBlockOnReplicationTable()){
Scan scan = new Scan();
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
allQueuesInCluster = replicationTable.getScanner(scan);
@ -244,7 +280,7 @@ abstract class ReplicationTableBase {
protected List<String> getLogsInQueue(byte[] rowKey) {
String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
try {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
Get getQueue = new Get(rowKey);
Result queue = replicationTable.get(getQueue);
if (queue == null || queue.isEmpty()) {
@ -286,66 +322,120 @@ abstract class ReplicationTableBase {
* @return a ResultScanner over the QueueIds belonging to the server
* @throws IOException
*/
private ResultScanner getQueuesBelongingToServer(String server) throws IOException {
protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
ResultScanner results = replicationTable.getScanner(scan);
return results;
try (Table replicationTable = getOrBlockOnReplicationTable()) {
ResultScanner results = replicationTable.getScanner(scan);
return results;
}
}
/**
* Gets the Replication Table. Builds and blocks until the table is available if the Replication
* Table does not exist.
* Attempts to acquire the Replication Table. This operation will block until it is assigned by
* the CreateReplicationWorker thread. It is up to the caller of this method to close the
* returned Table
* @return the Replication Table when it is created
* @throws IOException
*/
protected Table getOrBlockOnReplicationTable() throws IOException {
// Sleep until the Replication Table becomes available
try {
replicationTableInitialized.await();
} catch (InterruptedException e) {
String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
e.getMessage();
throw new InterruptedIOException(errMsg);
}
return getAndSetUpReplicationTable();
}
/**
* Creates a new copy of the Replication Table and sets up the proper Table time outs for it
*
* @return the Replication Table
* @throws IOException
*/
private Table getAndSetUpReplicationTable() throws IOException {
Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
setReplicationTableTimeOuts(replicationTable);
return replicationTable;
}
/**
* Builds the Replication Table in a background thread. Any method accessing the Replication Table
* should do so through getOrBlockOnReplicationTable()
*
* @return the Replication Table
* @throws IOException if the Replication Table takes too long to build
*/
private Table createAndGetReplicationTable() throws IOException {
if (!replicationTableExists()) {
createReplicationTable();
}
int maxRetries = conf.getInt("replication.queues.createtable.retries.number", 100);
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, 100);
RetryCounter retryCounter = counterFactory.create();
while (!replicationTableExists()) {
private void createReplicationTableInBackground() throws IOException {
executor.execute(new CreateReplicationTableWorker());
}
/**
* Attempts to build the Replication Table. Will continue blocking until we have a valid
* Table for the Replication Table.
*/
private class CreateReplicationTableWorker implements Runnable {
private Admin admin;
@Override
public void run() {
try {
retryCounter.sleepUntilNextRetry();
if (!retryCounter.shouldRetry()) {
throw new IOException("Unable to acquire the Replication Table");
admin = connection.getAdmin();
if (!replicationTableExists()) {
createReplicationTable();
}
} catch (InterruptedException e) {
return null;
int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
CLIENT_RETRIES);
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
RetryCounter retryCounter = counterFactory.create();
while (!replicationTableExists()) {
retryCounter.sleepUntilNextRetry();
if (!retryCounter.shouldRetry()) {
throw new IOException("Unable to acquire the Replication Table");
}
}
replicationTableInitialized.countDown();
} catch (IOException | InterruptedException e) {
abortable.abort("Failed building Replication Table", e);
}
}
return connection.getTable(REPLICATION_TABLE_NAME);
}
/**
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
* in TableBasedReplicationQueuesImpl
* @throws IOException
*/
private void createReplicationTable() throws IOException {
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
admin.createTable(replicationTableDescriptor);
}
/**
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
* in TableBasedReplicationQueuesImpl
*
* @throws IOException
*/
private void createReplicationTable() throws IOException {
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
try {
admin.createTable(replicationTableDescriptor);
} catch (TableExistsException e) {
// In this case we can just continue as normal
}
}
/**
* Checks whether the Replication Table exists yet
*
* @return whether the Replication Table exists
* @throws IOException
*/
private boolean replicationTableExists() {
try {
return admin.tableExists(REPLICATION_TABLE_NAME);
} catch (IOException e) {
return false;
/**
* Checks whether the Replication Table exists yet
*
* @return whether the Replication Table exists
* @throws IOException
*/
private boolean replicationTableExists() {
try {
return admin.tableExists(REPLICATION_TABLE_NAME);
} catch (IOException e) {
return false;
}
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
@ -73,7 +74,7 @@ public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase
public Set<String> getAllWALs() {
Set<String> allWals = new HashSet<String>();
ResultScanner allQueues = null;
try {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
allQueues = replicationTable.getScanner(new Scan());
for (Result queue : allQueues) {
for (String wal : readWALsFromResult(queue)) {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
@ -105,7 +106,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
@Override
public void addLog(String queueId, String filename) throws ReplicationException {
try {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
if (!checkQueueExists(queueId)) {
// Each queue will have an Owner, OwnerHistory, and a collection of [WAL:offset] key values
Put putNewQueue = new Put(Bytes.toBytes(buildQueueRowKey(queueId)));
@ -140,7 +141,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
@Override
public void setLogPosition(String queueId, String filename, long position) {
try {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
byte[] rowKey = queueIdToRowKey(queueId);
// Check that the log exists. addLog() must have been called before setLogPosition().
Get checkLogExists = new Get(rowKey);
@ -190,8 +191,31 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
@Override
public List<String> getLogsInQueue(String queueId) {
String errMsg = "Failed getting logs in queue queueId=" + queueId;
byte[] rowKey = queueIdToRowKey(queueId);
return getLogsInQueueAndCheckOwnership(rowKey);
List<String> logs = new ArrayList<String>();
try {
Get getQueue = new Get(rowKey);
Result queue = getResultIfOwner(getQueue);
if (queue == null || queue.isEmpty()) {
String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
return null;
}
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
for(byte[] cQualifier : familyMap.keySet()) {
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
COL_QUEUE_OWNER_HISTORY)) {
continue;
}
logs.add(Bytes.toString(cQualifier));
}
} catch (IOException e) {
abortable.abort(errMsg, e);
return null;
}
return logs;
}
@Override
@ -207,7 +231,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
}
ResultScanner queuesToClaim = null;
try {
queuesToClaim = getAllQueuesScanner(regionserver);
queuesToClaim = getQueuesBelongingToServer(regionserver);
for (Result queue : queuesToClaim) {
if (attemptToClaimQueue(queue, regionserver)) {
String rowKey = Bytes.toString(queue.getRow());
@ -240,24 +264,6 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
return queues;
}
/**
* Get the QueueIds belonging to the named server from the ReplicationTableBase
*
* @param server name of the server
* @return a ResultScanner over the QueueIds belonging to the server
* @throws IOException
*/
private ResultScanner getAllQueuesScanner(String server) throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
ResultScanner results = replicationTable.getScanner(scan);
return results;
}
@Override
public boolean isThisOurRegionServer(String regionserver) {
return this.serverName.equals(regionserver);
@ -287,33 +293,6 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
throw new NotImplementedException();
}
private List<String> getLogsInQueueAndCheckOwnership(byte[] rowKey) {
String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
List<String> logs = new ArrayList<String>();
try {
Get getQueue = new Get(rowKey);
Result queue = getResultIfOwner(getQueue);
if (queue == null || queue.isEmpty()) {
String errMsgLostOwnership = "Failed getting logs for queue queueId=" +
Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
return null;
}
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
for(byte[] cQualifier : familyMap.keySet()) {
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
COL_QUEUE_OWNER_HISTORY)) {
continue;
}
logs.add(Bytes.toString(cQualifier));
}
} catch (IOException e) {
abortable.abort(errMsg, e);
return null;
}
return logs;
}
private String buildQueueRowKey(String queueId) {
return buildQueueRowKey(serverName, queueId);
}
@ -358,11 +337,13 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
* @param mutate Mutation to perform on a given queue
*/
private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException{
boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE,
COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
if (!updateSuccess) {
throw new ReplicationException("Failed to update Replication Table because we lost queue " +
" ownership");
try (Table replicationTable = getOrBlockOnReplicationTable()) {
boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(),
CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, serverNameBytes, mutate);
if (!updateSuccess) {
throw new ReplicationException("Failed to update Replication Table because we lost queue " +
" ownership");
}
}
}
@ -374,8 +355,10 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
* @throws IOException
*/
private boolean checkQueueExists(String queueId) throws IOException {
byte[] rowKey = queueIdToRowKey(queueId);
return replicationTable.exists(new Get(rowKey));
try (Table replicationTable = getOrBlockOnReplicationTable()) {
byte[] rowKey = queueIdToRowKey(queueId);
return replicationTable.exists(new Get(rowKey));
}
}
/**
@ -399,9 +382,12 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
// Attempt to claim ownership for this queue by checking if the current OWNER is the original
// server. If it is not then another RS has already claimed it. If it is we set ourselves as the
// new owner and update the queue's history
boolean success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE, COL_QUEUE_OWNER,
CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue);
return success;
try (Table replicationTable = getOrBlockOnReplicationTable()) {
boolean success = replicationTable.checkAndMutate(queue.getRow(),
CF_QUEUE, COL_QUEUE_OWNER, CompareFilter.CompareOp.EQUAL, Bytes.toBytes(originalServer),
claimAndRenameQueue);
return success;
}
}
/**
@ -424,7 +410,7 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
CompareFilter.CompareOp.EQUAL, serverNameBytes);
scan.setFilter(checkOwner);
ResultScanner scanner = null;
try {
try (Table replicationTable = getOrBlockOnReplicationTable()) {
scanner = replicationTable.getScanner(scan);
Result result = scanner.next();
return (result == null || result.isEmpty()) ? null : result;

View File

@ -238,19 +238,11 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationQueues.addPeerToHFileRefs(id);
}
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
return;
}
List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+ otherRegionServers);
// Look if there's anything to process after a restart
for (String rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
transferQueues(rs);
}
AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
try {
this.executor.execute(adoptionWorker);
} catch (RejectedExecutionException ex) {
LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
}
}
@ -705,6 +697,31 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
class AdoptAbandonedQueuesWorker extends Thread{
public AdoptAbandonedQueuesWorker() {}
@Override
public void run() {
List<String> currentReplicators = replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.size() == 0) {
return;
}
List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+ otherRegionServers);
// Look if there's anything to process after a restart
for (String rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
}
/**
* Get the directory where wals are archived
* @return the directory where wals are archived

View File

@ -80,12 +80,12 @@ public class TestReplicationStateHBaseImpl {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniCluster();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility.startMiniCluster();
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);

View File

@ -0,0 +1,109 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests ReplicationTableBase behavior when the Master startup is delayed. The table initialization
* should be non-blocking, but any method calls that access the table should be blocking.
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationTableBase {
private static long SLEEP_MILLIS = 5000;
private static long TIME_OUT_MILLIS = 3000;
private static Configuration conf;
private static HBaseTestingUtility utility;
private static ZooKeeperWatcher zkw;
private static ReplicationTableBase rb;
private static ReplicationQueues rq;
private static ReplicationQueuesClient rqc;
private volatile boolean asyncRequestSuccess = false;
@Test
public void testSlowStartup() throws Exception{
utility = new HBaseTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
rb = new ReplicationTableBase(conf, zkw) {};
rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
conf, zkw, zkw));
rqc = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, zkw, zkw));
return true;
}
@Override
public String explainFailure() throws Exception {
return "Failed to initialize ReplicationTableBase, TableBasedReplicationQueuesClient and " +
"TableBasedReplicationQueues after a timeout=" + TIME_OUT_MILLIS +
" ms. Their initialization " + "should be non-blocking";
}
});
final RequestReplicationQueueData async = new RequestReplicationQueueData();
async.start();
Thread.sleep(SLEEP_MILLIS);
// Test that the Replication Table has not been assigned and the methods are blocking
assertFalse(rb.getInitializationStatus());
assertFalse(asyncRequestSuccess);
utility.startMiniCluster();
// Test that the methods do return the correct results after getting the table
utility.waitFor(0, TIME_OUT_MILLIS, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
async.join();
return true;
}
@Override
public String explainFailure() throws Exception {
return "ReplicationQueue failed to return list of replicators even after Replication Table "
+ "was initialized timeout=" + TIME_OUT_MILLIS + " ms";
}
});
assertTrue(asyncRequestSuccess);
}
public class RequestReplicationQueueData extends Thread {
@Override
public void run() {
assertEquals(0, rq.getListOfReplicators().size());
asyncRequestSuccess = true;
}
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -64,13 +65,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -88,69 +84,63 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Sets;
/**
* An abstract class that tests ReplicationSourceManager. Classes that extend this class should
* set up the proper config for this class and initialize the proper cluster using
* HBaseTestingUtility.
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationSourceManager {
public abstract class TestReplicationSourceManager {
private static final Log LOG =
protected static final Log LOG =
LogFactory.getLog(TestReplicationSourceManager.class);
private static Configuration conf;
protected static Configuration conf;
private static HBaseTestingUtility utility;
protected static HBaseTestingUtility utility;
private static Replication replication;
protected static Replication replication;
private static ReplicationSourceManager manager;
protected static ReplicationSourceManager manager;
private static ZooKeeperWatcher zkw;
protected static ZooKeeperWatcher zkw;
private static HTableDescriptor htd;
protected static HTableDescriptor htd;
private static HRegionInfo hri;
protected static HRegionInfo hri;
private static final byte[] r1 = Bytes.toBytes("r1");
protected static final byte[] r1 = Bytes.toBytes("r1");
private static final byte[] r2 = Bytes.toBytes("r2");
protected static final byte[] r2 = Bytes.toBytes("r2");
private static final byte[] f1 = Bytes.toBytes("f1");
protected static final byte[] f1 = Bytes.toBytes("f1");
private static final byte[] f2 = Bytes.toBytes("f2");
protected static final byte[] f2 = Bytes.toBytes("f2");
private static final TableName test =
protected static final TableName test =
TableName.valueOf("test");
private static final String slaveId = "1";
protected static final String slaveId = "1";
private static FileSystem fs;
protected static FileSystem fs;
private static Path oldLogDir;
protected static Path oldLogDir;
private static Path logDir;
protected static Path logDir;
private static CountDownLatch latch;
protected static CountDownLatch latch;
private static List<String> files = new ArrayList<String>();
private static NavigableMap<byte[], Integer> scopes;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
protected static List<String> files = new ArrayList<String>();
protected static NavigableMap<byte[], Integer> scopes;
protected static void setupZkAndReplication() throws Exception {
// The implementing class should set up the conf
assertNotNull(conf);
zkw = new ZooKeeperWatcher(conf, "test", null);
ZKUtil.createWithParents(zkw, "/hbase/replication");
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
@ -347,7 +337,7 @@ public class TestReplicationSourceManager {
Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
@ -356,7 +346,7 @@ public class TestReplicationSourceManager {
manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
new Long(1), new Long(2)));
w1.start();
w1.join(5000);
w1.join(10000);
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
@ -365,92 +355,6 @@ public class TestReplicationSourceManager {
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
@Test
public void testNodeFailoverDeadServerParsing() throws Exception {
LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
server.getZooKeeper()));
repQueues.init(server.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
}
// create 3 DummyServers
Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
Map<String, Set<String>> testMap =
rq1.claimQueues(server.getServerName().getServerName());
ReplicationQueues rq2 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
s2.getZooKeeper()));
rq2.init(s2.getServerName().toString());
testMap = rq2.claimQueues(s1.getServerName().getServerName());
ReplicationQueues rq3 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
s3.getZooKeeper()));
rq3.init(s3.getServerName().toString());
testMap = rq3.claimQueues(s2.getServerName().getServerName());
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
List<String> result = replicationQueueInfo.getDeadRegionServers();
// verify
assertTrue(result.contains(server.getServerName().getServerName()));
assertTrue(result.contains(s1.getServerName().getServerName()));
assertTrue(result.contains(s2.getServerName().getServerName()));
server.abort("", null);
}
@Test
public void testFailoverDeadServerCversionChange() throws Exception {
LOG.debug("testFailoverDeadServerCversionChange");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
s0.getZooKeeper()));
repQueues.init(s0.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
}
// simulate queue transfer
Server s1 = new DummyServer("cversion-change1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationQueuesClientZKImpl client =
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
int v0 = client.getQueuesZNodeCversion();
rq1.claimQueues(s0.getServerName().getServerName());
int v1 = client.getQueuesZNodeCversion();
// cversion should increased by 1 since a child node is deleted
assertEquals(v0 + 1, v1);
s0.abort("", null);
}
@Test
public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);

View File

@ -0,0 +1,152 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
* ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
* TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
setupZkAndReplication();
}
// Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
@Test
public void testNodeFailoverDeadServerParsing() throws Exception {
LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
server.getZooKeeper()));
repQueues.init(server.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
}
// create 3 DummyServers
Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
Map<String, Set<String>> testMap =
rq1.claimQueues(server.getServerName().getServerName());
ReplicationQueues rq2 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
s2.getZooKeeper()));
rq2.init(s2.getServerName().toString());
testMap = rq2.claimQueues(s1.getServerName().getServerName());
ReplicationQueues rq3 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
s3.getZooKeeper()));
rq3.init(s3.getServerName().toString());
testMap = rq3.claimQueues(s2.getServerName().getServerName());
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.keySet().iterator().next());
List<String> result = replicationQueueInfo.getDeadRegionServers();
// verify
assertTrue(result.contains(server.getServerName().getServerName()));
assertTrue(result.contains(s1.getServerName().getServerName()));
assertTrue(result.contains(s2.getServerName().getServerName()));
server.stop("");
}
@Test
public void testFailoverDeadServerCversionChange() throws Exception {
LOG.debug("testFailoverDeadServerCversionChange");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
s0.getZooKeeper()));
repQueues.init(s0.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
}
// simulate queue transfer
Server s1 = new DummyServer("cversion-change1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationQueuesClientZKImpl client =
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
int v0 = client.getQueuesZNodeCversion();
rq1.claimQueues(s0.getServerName().getServerName());
int v1 = client.getQueuesZNodeCversion();
// cversion should increase by 1 since a child node is deleted
assertEquals(v0 + 1, v1);
s0.stop("");
}
}

View File

@ -0,0 +1,60 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesClientImpl;
import org.apache.hadoop.hbase.replication.TableBasedReplicationQueuesImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* Tests the ReplicationSourceManager with TableBasedReplicationQueue's and
* TableBasedReplicationQueuesClient
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationSourceManager {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility = new HBaseTestingUtility(conf);
utility.startMiniCluster();
setupZkAndReplication();
}
}