listOfPeers = zkHelper.getListPeersForRS(rs, this);
+ // if rs just died, this will be null
+ if (listOfPeers == null) {
+ continue;
+ }
+ for (String id : listOfPeers) {
+ List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
+ if (peersHlogs != null) {
+ this.hlogs.addAll(peersHlogs);
+ }
+ // early exit if we found the log
+ if(lookForLog && this.hlogs.contains(searchedLog)) {
+ LOG.debug("Found log in ZK, keeping: " + searchedLog);
+ return true;
+ }
+ }
+ }
+ LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
+ return false;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.ttlCleaner = new TimeToLiveLogCleaner();
+ this.ttlCleaner.setConf(conf);
+ try {
+ this.zkHelper = new ReplicationZookeeperWrapper(
+ ZooKeeperWrapper.createInstance(this.conf,
+ HMaster.class.getName()),
+ this.conf, new AtomicBoolean(true), null);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ refreshHLogsAndSearch(null);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {}
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/package.html b/src/main/java/org/apache/hadoop/hbase/replication/package.html
new file mode 100644
index 00000000000..f0146a1a2f4
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/replication/package.html
@@ -0,0 +1,128 @@
+
+
+
+
+
+
+
+Multi Cluster Replication
+This package provides replication between HBase clusters.
+
+
+
Table Of Contents
+
+ - Status
+ - Requirements
+ - Deployment
+
+
+
+
+Status
+
+
+This package is alpha quality software and is only meant to be a base
+for future developments. The current implementation offers the following
+features:
+
+
+ - Master/Slave replication limited to 1 slave cluster.
+ - Replication of scoped families in user tables.
+ - Start/stop replication stream.
+ - Supports clusters of different sizes.
+ - Handling of partitions longer than 10 minutes
+
+Please report bugs on the project's Jira when found.
+
+
+Requirements
+
+
+
+Before trying out replication, make sure to review the following requirements:
+
+
+ - Zookeeper should be handled by yourself, not by HBase, and should
+ always be available during the deployment.
+ - All machines from both clusters should be able to reach every
+ other machine since replication goes from any region server to any
+ other one on the slave cluster. That also includes the
+ Zookeeper clusters.
+ - Both clusters should have the same HBase and Hadoop major revision.
+ For example, having 0.21.1 on the master and 0.21.0 on the slave is
+ correct but not 0.21.1 and 0.22.0.
+ - Every table that contains families that are scoped for replication
+ should exist on every cluster with the exact same name, same for those
+ replicated families.
+
+
+
+
+Deployment
+
+
+
+The following steps describe how to enable replication from a cluster
+to another. This must be done with both clusters offlined.
+
+ - Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
+ the following configurations:
+
+<property>
+ <name>hbase.replication.enabled</name>
+ <value>true</value>
+</property>
+
+ - Run the following command on any cluster:
+
+$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/replication/bin/add_peer.tb
+ This will show you the help to setup the replication stream between
+ both clusters. If both clusters use the same Zookeeper cluster, you have
+ to use a different zookeeper.znode.parent since they can't
+ write in the same folder.
+
+ - You can now start and stop the clusters with your preferred method.
+
+
+You can confirm that your setup works by looking at any region server's log
+on the master cluster and look for the following lines;
+
+
+Considering 1 rs, with ratio 0.1
+Getting 1 rs from peer cluster # 0
+Choosing peer 10.10.1.49:62020
+
+In this case it indicates that 1 region server from the slave cluster
+was chosen for replication.
+
+Should you want to stop the replication while the clusters are running, open
+the shell on the master cluster and issue this command:
+
+hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'
+
+Where you replace the znode parent with the one configured on your master
+cluster. Replication of already queued edits will still happen after you
+issued that command but new entries won't be. To start it back, simply replace
+"false" with "true" in the command.
+
+
+
+
+
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
new file mode 100644
index 00000000000..a037aae58f4
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Replication serves as an umbrella over the setup of replication and
+ * is used by HRS.
+ */
+public class Replication implements LogEntryVisitor {
+
+ private final boolean replication;
+ private final ReplicationSourceManager replicationManager;
+ private boolean replicationMaster;
+ private final AtomicBoolean replicating = new AtomicBoolean(true);
+ private final ReplicationZookeeperWrapper zkHelper;
+ private final Configuration conf;
+ private final AtomicBoolean stopRequested;
+ private ReplicationSink replicationSink;
+
+ /**
+ * Instantiate the replication management (if rep is enabled).
+ * @param conf conf to use
+ * @param hsi the info if this region server
+ * @param fs handle to the filesystem
+ * @param oldLogDir directory where logs are archived
+ * @param stopRequested boolean that tells us if we are shutting down
+ * @throws IOException
+ */
+ public Replication(Configuration conf, HServerInfo hsi,
+ FileSystem fs, Path oldLogDir,
+ AtomicBoolean stopRequested) throws IOException {
+ this.conf = conf;
+ this.stopRequested = stopRequested;
+ this.replication =
+ conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
+ if (replication) {
+ this.zkHelper = new ReplicationZookeeperWrapper(
+ ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
+ this.replicating, hsi.getServerName());
+ this.replicationMaster = zkHelper.isReplicationMaster();
+ this.replicationManager = this.replicationMaster ?
+ new ReplicationSourceManager(zkHelper, conf, stopRequested,
+ fs, this.replicating, oldLogDir) : null;
+ } else {
+ replicationManager = null;
+ zkHelper = null;
+ }
+ }
+
+ /**
+ * Join with the replication threads
+ */
+ public void join() {
+ if (this.replication) {
+ if (this.replicationMaster) {
+ this.replicationManager.join();
+ }
+ this.zkHelper.deleteOwnRSZNode();
+ }
+ }
+
+ /**
+ * Carry on the list of log entries down to the sink
+ * @param entries list of entries to replicate
+ * @throws IOException
+ */
+ public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ if (this.replication && !this.replicationMaster) {
+ this.replicationSink.replicateEntries(entries);
+ }
+ }
+
+ /**
+ * If replication is enabled and this cluster is a master,
+ * it starts
+ * @throws IOException
+ */
+ public void startReplicationServices() throws IOException {
+ if (this.replication) {
+ if (this.replicationMaster) {
+ this.replicationManager.init();
+ } else {
+ this.replicationSink =
+ new ReplicationSink(this.conf, this.stopRequested);
+ }
+ }
+ }
+
+ /**
+ * Get the replication sources manager
+ * @return the manager if replication is enabled, else returns false
+ */
+ public ReplicationSourceManager getReplicationManager() {
+ return replicationManager;
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) {
+ NavigableMap scopes =
+ new TreeMap(Bytes.BYTES_COMPARATOR);
+ byte[] family;
+ for (KeyValue kv : logEdit.getKeyValues()) {
+ family = kv.getFamily();
+ int scope = info.getTableDesc().getFamily(family).getScope();
+ if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
+ !scopes.containsKey(family)) {
+ scopes.put(family, scope);
+ }
+ }
+ if (!scopes.isEmpty()) {
+ logEdit.setScopes(scopes);
+ }
+ }
+
+ /**
+ * Add this class as a log entry visitor for HLog if replication is enabled
+ * @param hlog log that was add ourselves on
+ */
+ public void addLogEntryVisitor(HLog hlog) {
+ if (replication) {
+ hlog.addLogEntryVisitor(this);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
new file mode 100644
index 00000000000..3eec1d70abd
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is responsible for replicating the edits coming
+ * from another cluster.
+ *
+ * This replication process is currently waiting for the edits to be applied
+ * before the method can return. This means that the replication of edits
+ * is synchronized (after reading from HLogs in ReplicationSource) and that a
+ * single region server cannot receive edits from two sources at the same time
+ *
+ * This class uses the native HBase client in order to replicate entries.
+ *
+ *
+ * TODO make this class more like ReplicationSource wrt log handling
+ */
+public class ReplicationSink {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
+ // Name of the HDFS directory that contains the temporary rep logs
+ public static final String REPLICATION_LOG_DIR = ".replogs";
+ private final Configuration conf;
+ // Pool used to replicated
+ private final HTablePool pool;
+ // boolean coming from HRS to know when the process stops
+ private final AtomicBoolean stop;
+
+ /**
+ * Create a sink for replication
+ *
+ * @param conf conf object
+ * @param stopper boolean to tell this thread to stop
+ * @throws IOException thrown when HDFS goes bad or bad file name
+ */
+ public ReplicationSink(Configuration conf, AtomicBoolean stopper)
+ throws IOException {
+ this.conf = conf;
+ this.pool = new HTablePool(this.conf,
+ conf.getInt("replication.sink.htablepool.capacity", 10));
+ this.stop = stopper;
+ }
+
+ /**
+ * Replicate this array of entries directly into the local cluster
+ * using the native client.
+ *
+ * @param entries
+ * @throws IOException
+ */
+ public synchronized void replicateEntries(HLog.Entry[] entries)
+ throws IOException {
+ // Very simple optimization where we batch sequences of rows going
+ // to the same table.
+ try {
+ long totalReplicated = 0;
+ byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
+ List puts = new ArrayList();
+ for (HLog.Entry entry : entries) {
+ WALEdit edit = entry.getEdit();
+ List kvs = edit.getKeyValues();
+ if (kvs.get(0).isDelete()) {
+ Delete delete = new Delete(kvs.get(0).getRow(),
+ kvs.get(0).getTimestamp(), null);
+ for (KeyValue kv : kvs) {
+ if (kv.isDeleteFamily()) {
+ delete.deleteFamily(kv.getFamily());
+ } else if (!kv.isEmptyColumn()) {
+ delete.deleteColumn(kv.getFamily(),
+ kv.getQualifier());
+ }
+ }
+ delete(entry.getKey().getTablename(), delete);
+ } else {
+ // Switching table, flush
+ if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
+ put(lastTable, puts);
+ }
+ // With mini-batching, we need to expect multiple rows per edit
+ byte[] lastKey = kvs.get(0).getRow();
+ Put put = new Put(kvs.get(0).getRow(),
+ kvs.get(0).getTimestamp());
+ for (KeyValue kv : kvs) {
+ if (!Bytes.equals(lastKey, kv.getRow())) {
+ puts.add(put);
+ put = new Put(kv.getRow(), kv.getTimestamp());
+ }
+ put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
+ lastKey = kv.getRow();
+ }
+ puts.add(put);
+ lastTable = entry.getKey().getTablename();
+ }
+ totalReplicated++;
+ }
+ put(lastTable, puts);
+ LOG.info("Total replicated: " + totalReplicated);
+ } catch (IOException ex) {
+ if (ex.getCause() instanceof TableNotFoundException) {
+ LOG.warn("Losing edits because: ", ex);
+ } else {
+ // Should we log rejected edits in a file for replay?
+ LOG.error("Unable to accept edit because", ex);
+ this.stop.set(true);
+ throw ex;
+ }
+ } catch (RuntimeException re) {
+ if (re.getCause() instanceof TableNotFoundException) {
+ LOG.warn("Losing edits because: ", re);
+ } else {
+ this.stop.set(true);
+ throw re;
+ }
+ }
+ }
+
+ /**
+ * Do the puts and handle the pool
+ * @param tableName table to insert into
+ * @param puts list of puts
+ * @throws IOException
+ */
+ private void put(byte[] tableName, List puts) throws IOException {
+ if (puts.isEmpty()) {
+ return;
+ }
+ HTableInterface table = null;
+ try {
+ table = this.pool.getTable(tableName);
+ table.put(puts);
+ this.pool.putTable(table);
+ puts.clear();
+ } finally {
+ if (table != null) {
+ this.pool.putTable(table);
+ }
+ }
+ }
+
+ /**
+ * Do the delete and handle the pool
+ * @param tableName table to delete in
+ * @param delete the delete to use
+ * @throws IOException
+ */
+ private void delete(byte[] tableName, Delete delete) throws IOException {
+ HTableInterface table = null;
+ try {
+ table = this.pool.getTable(tableName);
+ table.delete(delete);
+ this.pool.putTable(table);
+ } finally {
+ if (table != null) {
+ this.pool.putTable(table);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
new file mode 100644
index 00000000000..6b9dcb555a4
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -0,0 +1,647 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class that handles the source of a replication stream.
+ * Currently does not handle more than 1 slave
+ * For each slave cluster it selects a random number of peers
+ * using a replication ratio. For example, if replication ration = 0.1
+ * and slave cluster has 100 region servers, 10 will be selected.
+ *
+ * A stream is considered down when we cannot contact a region server on the
+ * peer cluster for more than 55 seconds by default.
+ *
+ *
+ */
+public class ReplicationSource extends Thread
+ implements ReplicationSourceInterface {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
+ // Lock to manage when a HLog is changing path (archiving)
+ private final ReentrantLock pathLock = new ReentrantLock();
+ // Queue of logs to process
+ private PriorityBlockingQueue queue;
+ // container of entries to replicate
+ private HLog.Entry[] entriesArray;
+ private HConnection conn;
+ // Helper class for zookeeper
+ private ReplicationZookeeperWrapper zkHelper;
+ private Configuration conf;
+ // ratio of region servers to chose from a slave cluster
+ private float ratio;
+ private Random random;
+ // should we replicate or not?
+ private AtomicBoolean replicating;
+ // id of the peer cluster this source replicates to
+ private String peerClusterId;
+ // The manager of all sources to which we ping back our progress
+ private ReplicationSourceManager manager;
+ // Should we stop everything?
+ private AtomicBoolean stop;
+ // List of chosen sinks (region servers)
+ private List currentPeers;
+ // How long should we sleep for each retry
+ private long sleepForRetries;
+ // Max size in bytes of entriesArray
+ private long replicationQueueSizeCapacity;
+ // Max number of entries in entriesArray
+ private int replicationQueueNbCapacity;
+ // Our reader for the current log
+ private HLog.Reader reader;
+ // Current position in the log
+ private long position = 0;
+ // Path of the current log
+ private volatile Path currentPath;
+ private FileSystem fs;
+ // id of this cluster
+ private byte clusterId;
+ // total number of edits we replicated
+ private long totalReplicatedEdits = 0;
+ // The znode we currently play with
+ private String peerClusterZnode;
+ // Indicates if this queue is recovered (and will be deleted when depleted)
+ private boolean queueRecovered;
+ // Maximum number of retries before taking bold actions
+ private long maxRetriesMultiplier;
+ // Current number of entries that we need to replicate
+ private int currentNbEntries = 0;
+ // Indicates if this particular source is running
+ private volatile boolean running = true;
+
+ /**
+ * Instantiation method used by region servers
+ *
+ * @param conf configuration to use
+ * @param fs file system to use
+ * @param manager replication manager to ping to
+ * @param stopper the atomic boolean to use to stop the regionserver
+ * @param replicating the atomic boolean that starts/stops replication
+ * @param peerClusterZnode the name of our znode
+ * @throws IOException
+ */
+ public void init(final Configuration conf,
+ final FileSystem fs,
+ final ReplicationSourceManager manager,
+ final AtomicBoolean stopper,
+ final AtomicBoolean replicating,
+ final String peerClusterZnode)
+ throws IOException {
+ this.stop = stopper;
+ this.conf = conf;
+ this.replicationQueueSizeCapacity =
+ this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
+ this.replicationQueueNbCapacity =
+ this.conf.getInt("replication.source.nb.capacity", 25000);
+ this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
+ for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
+ this.entriesArray[i] = new HLog.Entry();
+ }
+ this.maxRetriesMultiplier =
+ this.conf.getLong("replication.source.maxretriesmultiplier", 10);
+ this.queue =
+ new PriorityBlockingQueue(
+ conf.getInt("hbase.regionserver.maxlogs", 32),
+ new LogsComparator());
+ this.conn = HConnectionManager.getConnection(conf);
+ this.zkHelper = manager.getRepZkWrapper();
+ this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
+ this.currentPeers = new ArrayList();
+ this.random = new Random();
+ this.replicating = replicating;
+ this.manager = manager;
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000);
+ this.fs = fs;
+ this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+
+ // Finally look if this is a recovered queue
+ this.checkIfQueueRecovered(peerClusterZnode);
+ }
+
+ // The passed znode will be either the id of the peer cluster or
+ // the handling story of that queue in the form of id-startcode-*
+ private void checkIfQueueRecovered(String peerClusterZnode) {
+ String[] parts = peerClusterZnode.split("-");
+ this.queueRecovered = parts.length != 1;
+ this.peerClusterId = this.queueRecovered ?
+ parts[0] : peerClusterZnode;
+ this.peerClusterZnode = peerClusterZnode;
+ }
+
+ /**
+ * Select a number of peers at random using the ratio. Mininum 1.
+ */
+ private void chooseSinks() {
+ this.currentPeers.clear();
+ List addresses =
+ this.zkHelper.getPeersAddresses(peerClusterId);
+ Set setOfAddr = new HashSet();
+ int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
+ LOG.info("Getting " + nbPeers +
+ " rs from peer cluster # " + peerClusterId);
+ for (int i = 0; i < nbPeers; i++) {
+ HServerAddress address;
+ // Make sure we get one address that we don't already have
+ do {
+ address = addresses.get(this.random.nextInt(addresses.size()));
+ } while (setOfAddr.contains(address));
+ LOG.info("Choosing peer " + address);
+ setOfAddr.add(address);
+ }
+ this.currentPeers.addAll(setOfAddr);
+ }
+
+ @Override
+ public void enqueueLog(Path log) {
+ this.queue.put(log);
+ }
+
+ @Override
+ public void logArchived(Path oldPath, Path newPath) {
+ // in sync with queue polling
+ this.pathLock.lock();
+ try {
+ if (oldPath.equals(this.currentPath)) {
+ this.currentPath = newPath;
+ LOG.debug("Current log moved, changing currentPath to " +newPath);
+ return;
+ }
+
+ boolean present = this.queue.remove(oldPath);
+ LOG.debug("old log was " + (present ?
+ "present, changing the queue" : "already processed"));
+ if (present) {
+ this.queue.add(newPath);
+ }
+ } finally {
+ this.pathLock.unlock();
+ }
+ }
+
+ @Override
+ public void run() {
+ connectToPeers();
+ // We were stopped while looping to connect to sinks, just abort
+ if (this.stop.get()) {
+ return;
+ }
+ // If this is recovered, the queue is already full and the first log
+ // normally has a position (unless the RS failed between 2 logs)
+ if (this.queueRecovered) {
+ this.position = this.zkHelper.getHLogRepPosition(
+ this.peerClusterZnode, this.queue.peek().getName());
+ }
+ int sleepMultiplier = 1;
+ // Loop until we close down
+ while (!stop.get() && this.running) {
+
+ // In sync with logArchived
+ this.pathLock.lock();
+ try {
+ // Get a new path
+ if (!getNextPath()) {
+ if (sleepForRetries("No log to process", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ // Open a reader on it
+ if (!openReader(sleepMultiplier)) {
+ continue;
+ }
+ } finally {
+ this.pathLock.unlock();
+ }
+
+ // If we got a null reader but didn't continue, then sleep and continue
+ if (this.reader == null) {
+ if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ boolean gotIOE = false; // TODO this is a hack for HDFS-1057
+ currentNbEntries = 0;
+ try {
+ if(readAllEntriesToReplicateOrNextFile()) {
+ continue;
+ }
+ } catch (IOException ioe) {
+ LOG.warn(peerClusterZnode + " Got: ", ioe);
+ gotIOE = true;
+ if (ioe.getCause() instanceof EOFException) {
+
+ boolean considerDumping = false;
+ if (this.queueRecovered) {
+ try {
+ FileStatus stat = this.fs.getFileStatus(this.currentPath);
+ if (stat.getLen() == 0) {
+ LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+ }
+ considerDumping = true;
+ } catch (IOException e) {
+ LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+ }
+ } else if (currentNbEntries != 0) {
+ LOG.warn(peerClusterZnode + " Got EOF while reading, " +
+ "looks like this file is broken? " + currentPath);
+ considerDumping = true;
+ currentNbEntries = 0;
+ }
+
+ if (considerDumping &&
+ sleepMultiplier == this.maxRetriesMultiplier &&
+ processEndOfFile()) {
+ continue;
+ }
+ }
+ } finally {
+ try {
+ // if current path is null, it means we processEndOfFile hence
+ if (this.currentPath != null && !gotIOE) {
+ this.position = this.reader.getPosition();
+ }
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ } catch (IOException e) {
+ gotIOE = true;
+ LOG.warn("Unable to finalize the tailing of a file", e);
+ }
+ }
+
+ // If we didn't get anything to replicate, or if we hit a IOE,
+ // wait a bit and retry.
+ // But if we need to stop, don't bother sleeping
+ if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
+ if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ sleepMultiplier = 1;
+ shipEdits();
+
+ }
+ LOG.debug("Source exiting " + peerClusterId);
+ }
+
+ /**
+ * Read all the entries from the current log files and retain those
+ * that need to be replicated. Else, process the end of the current file.
+ * @return true if we got nothing and went to the next file, false if we got
+ * entries
+ * @throws IOException
+ */
+ protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+ long seenEntries = 0;
+ if (this.position != 0) {
+ this.reader.seek(this.position);
+ }
+ HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
+ while (entry != null) {
+ WALEdit edit = entry.getEdit();
+ seenEntries++;
+ // Remove all KVs that should not be replicated
+ removeNonReplicableEdits(edit);
+ HLogKey logKey = entry.getKey();
+ // Don't replicate catalog entries, if the WALEdit wasn't
+ // containing anything to replicate and if we're currently not set to replicate
+ if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
+ Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
+ edit.size() != 0 && replicating.get()) {
+ logKey.setClusterId(this.clusterId);
+ currentNbEntries++;
+ }
+ // Stop if too many entries or too big
+ if ((this.reader.getPosition() - this.position)
+ >= this.replicationQueueSizeCapacity ||
+ currentNbEntries >= this.replicationQueueNbCapacity) {
+ break;
+ }
+ entry = this.reader.next(entriesArray[currentNbEntries]);
+ }
+ LOG.debug("currentNbEntries:" + currentNbEntries +
+ " and seenEntries:" + seenEntries +
+ " and size: " + (this.reader.getPosition() - this.position));
+ // If we didn't get anything and the queue has an object, it means we
+ // hit the end of the file for sure
+ return seenEntries == 0 && processEndOfFile();
+ }
+
+ private void connectToPeers() {
+ // Connect to peer cluster first, unless we have to stop
+ while (!this.stop.get() && this.currentPeers.size() == 0) {
+ try {
+ chooseSinks();
+ Thread.sleep(this.sleepForRetries);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while trying to connect to sinks", e);
+ }
+ }
+ }
+
+ /**
+ * Poll for the next path
+ * @return true if a path was obtained, false if not
+ */
+ protected boolean getNextPath() {
+ try {
+ if (this.currentPath == null) {
+ this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while reading edits", e);
+ }
+ return this.currentPath != null;
+ }
+
+ /**
+ * Open a reader on the current path
+ *
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return true if we should continue with that file, false if we are over with it
+ */
+ protected boolean openReader(int sleepMultiplier) {
+ try {
+ LOG.info("Opening log for replication " + this.currentPath.getName() + " at " + this.position);
+ this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
+ } catch (IOException ioe) {
+ this.reader = null;
+ LOG.warn(peerClusterZnode + " Got: ", ioe);
+ // TODO Need a better way to determinate if a file is really gone but
+ // TODO without scanning all logs dir
+ if (sleepMultiplier == this.maxRetriesMultiplier) {
+ LOG.warn("Waited too long for this file, considering dumping");
+ return !processEndOfFile();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return
+ */
+ protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+ try {
+ LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
+
+ /**
+ * We only want KVs that are scoped other than local
+ * @param edit The KV to check for replication
+ */
+ protected void removeNonReplicableEdits(WALEdit edit) {
+ NavigableMap scopes = edit.getScopes();
+ List kvs = edit.getKeyValues();
+ for (int i = 0; i < edit.size(); i++) {
+ KeyValue kv = kvs.get(i);
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ if (scopes == null || !scopes.containsKey(kv.getFamily())) {
+ kvs.remove(i);
+ i--;
+ }
+ }
+ }
+
+ /**
+ * Do the shipping logic
+ */
+ protected void shipEdits() {
+ int sleepMultiplier = 1;
+ while (!stop.get()) {
+ try {
+ HRegionInterface rrs = getRS();
+ LOG.debug("Replicating " + currentNbEntries);
+ rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
+ this.manager.logPositionAndCleanOldLogs(this.currentPath,
+ this.peerClusterZnode, this.position, queueRecovered);
+ this.totalReplicatedEdits += currentNbEntries;
+ LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
+ break;
+
+ } catch (IOException ioe) {
+ LOG.warn("Unable to replicate because ", ioe);
+ try {
+ boolean down;
+ do {
+ down = isSlaveDown();
+ if (down) {
+ LOG.debug("The region server we tried to ping didn't answer, " +
+ "sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier++;
+ } else {
+ chooseSinks();
+ }
+ }
+ } while (!stop.get() && down);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while trying to contact the peer cluster");
+ }
+
+ }
+ }
+ }
+
+ /**
+ * If the queue isn't empty, switch to the next one
+ * Else if this is a recovered queue, it means we're done!
+ * Else we'll just continue to try reading the log file
+ * @return true if we're done with the current file, false if we should
+ * continue trying to read from it
+ */
+ protected boolean processEndOfFile() {
+ this.pathLock.lock();
+ try {
+ if (this.queue.size() != 0) {
+ this.currentPath = null;
+ this.position = 0;
+ return true;
+ } else if (this.queueRecovered) {
+ this.manager.closeRecoveredQueue(this);
+ this.abort();
+ return true;
+ }
+ } finally {
+ this.pathLock.unlock();
+ }
+ return false;
+ }
+
+ public void startup() {
+ String n = Thread.currentThread().getName();
+ Thread.UncaughtExceptionHandler handler =
+ new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ abort();
+ }
+ };
+ Threads.setDaemonThreadRunning(
+ this, n + ".replicationSource," + clusterId, handler);
+ }
+
+ /**
+ * Hastily stop the replication, then wait for shutdown
+ */
+ private void abort() {
+ LOG.info("abort");
+ this.running = false;
+ terminate();
+ }
+
+ public void terminate() {
+ LOG.info("terminate");
+ Threads.shutdown(this, this.sleepForRetries);
+ }
+
+ /**
+ * Get a new region server at random from this peer
+ * @return
+ * @throws IOException
+ */
+ private HRegionInterface getRS() throws IOException {
+ if (this.currentPeers.size() == 0) {
+ throw new IOException(this.peerClusterZnode + " has 0 region servers");
+ }
+ HServerAddress address =
+ currentPeers.get(random.nextInt(this.currentPeers.size()));
+ return this.conn.getHRegionConnection(address);
+ }
+
+ /**
+ * Check if the slave is down by trying to establish a connection
+ * @return true if down, false if up
+ * @throws InterruptedException
+ */
+ public boolean isSlaveDown() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread pingThread = new Thread() {
+ public void run() {
+ try {
+ HRegionInterface rrs = getRS();
+ // Dummy call which should fail
+ rrs.getHServerInfo();
+ latch.countDown();
+ } catch (IOException ex) {
+ LOG.info("Slave cluster looks down: " + ex.getMessage());
+ }
+ }
+ };
+ pingThread.start();
+ // awaits returns true if countDown happened
+ boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
+ pingThread.interrupt();
+ return down;
+ }
+
+ /**
+ * Get the id that the source is replicating to
+ *
+ * @return peer cluster id
+ */
+ public String getPeerClusterZnode() {
+ return this.peerClusterZnode;
+ }
+
+ /**
+ * Get the path of the current HLog
+ * @return current hlog's path
+ */
+ public Path getCurrentPath() {
+ return this.currentPath;
+ }
+
+ /**
+ * Comparator used to compare logs together based on their start time
+ */
+ public static class LogsComparator implements Comparator {
+
+ @Override
+ public int compare(Path o1, Path o2) {
+ return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return true;
+ }
+
+ /**
+ * Split a path to get the start time
+ * For example: 10.20.20.171%3A60020.1277499063250
+ * @param p path to split
+ * @return start time
+ */
+ private long getTS(Path p) {
+ String[] parts = p.getName().split("\\.");
+ return Long.parseLong(parts[parts.length-1]);
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
new file mode 100644
index 00000000000..1d7ae7edb62
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Interface that defines a replication source
+ */
+public interface ReplicationSourceInterface {
+
+ /**
+ * Initializer for the source
+ * @param conf the configuration to use
+ * @param fs the file system to use
+ * @param manager the manager to use
+ * @param stopper the stopper object for this region server
+ * @param replicating the status of the replication on this cluster
+ * @param peerClusterId the id of the peer cluster
+ * @throws IOException
+ */
+ public void init(final Configuration conf,
+ final FileSystem fs,
+ final ReplicationSourceManager manager,
+ final AtomicBoolean stopper,
+ final AtomicBoolean replicating,
+ final String peerClusterId) throws IOException;
+
+ /**
+ * Add a log to the list of logs to replicate
+ * @param log path to the log to replicate
+ */
+ public void enqueueLog(Path log);
+
+ /**
+ * Get the current log that's replicated
+ * @return the current log
+ */
+ public Path getCurrentPath();
+
+ /**
+ * Start the replication
+ */
+ public void startup();
+
+ /**
+ * End the replication
+ */
+ public void terminate();
+
+ /**
+ * Get the id that the source is replicating to
+ *
+ * @return peer cluster id
+ */
+ public String getPeerClusterZnode();
+
+ /**
+ * Notify this source that a log was archived
+ * @param oldPath old path of the log
+ * @param newPath new path of the log (archive)
+ */
+ public void logArchived(Path oldPath, Path newPath);
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
new file mode 100644
index 00000000000..2e13a0a8bfa
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is responsible to manage all the replication
+ * sources. There are two classes of sources:
+ * Normal sources are persistent and one per peer cluster
+ * Old sources are recovered from a failed region server and our
+ * only goal is to finish replicating the HLog queue it had up in ZK
+ *
+ * When a region server dies, this class uses a watcher to get notified and it
+ * tries to grab a lock in order to transfer all the queues in a local
+ * old source.
+ */
+public class ReplicationSourceManager implements LogActionsListener {
+
+ private static final Log LOG =
+ LogFactory.getLog(ReplicationSourceManager.class);
+ // List of all the sources that read this RS's logs
+ private final List sources;
+ // List of all the sources we got from died RSs
+ private final List oldsources;
+ // Indicates if we are currently replicating
+ private final AtomicBoolean replicating;
+ // Helper for zookeeper
+ private final ReplicationZookeeperWrapper zkHelper;
+ // Indicates if the region server is closing
+ private final AtomicBoolean stopper;
+ // All logs we are currently trackign
+ private final SortedSet hlogs;
+ private final Configuration conf;
+ private final FileSystem fs;
+ // The path to the latest log we saw, for new coming sources
+ private Path latestPath;
+ // List of all the other region servers in this cluster
+ private final List otherRegionServers;
+ // Path to the hlog archive
+ private final Path oldLogDir;
+
+ /**
+ * Creates a replication manager and sets the watch on all the other
+ * registered region servers
+ * @param zkHelper the zk helper for replication
+ * @param conf the configuration to use
+ * @param stopper the stopper object for this region server
+ * @param fs the file system to use
+ * @param replicating the status of the replication on this cluster
+ * @param oldLogDir the directory where old logs are archived
+ */
+ public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
+ final Configuration conf,
+ final AtomicBoolean stopper,
+ final FileSystem fs,
+ final AtomicBoolean replicating,
+ final Path oldLogDir) {
+ this.sources = new ArrayList();
+ this.replicating = replicating;
+ this.zkHelper = zkHelper;
+ this.stopper = stopper;
+ this.hlogs = new TreeSet();
+ this.oldsources = new ArrayList();
+ this.conf = conf;
+ this.fs = fs;
+ this.oldLogDir = oldLogDir;
+ List otherRSs =
+ this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+ this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs;
+ }
+
+ /**
+ * Provide the id of the peer and a log key and this method will figure which
+ * hlog it belongs to and will log, for this region server, the current
+ * position. It will also clean old logs from the queue.
+ * @param log Path to the log currently being replicated from
+ * replication status in zookeeper. It will also delete older entries.
+ * @param id id of the peer cluster
+ * @param position current location in the log
+ * @param queueRecovered indicates if this queue comes from another region server
+ */
+ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+ String key = log.getName();
+ LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
+ this.zkHelper.writeReplicationStatus(key.toString(), id, position);
+ synchronized (this.hlogs) {
+ if (!queueRecovered && this.hlogs.first() != key) {
+ SortedSet hlogSet = this.hlogs.headSet(key);
+ LOG.info("Removing " + hlogSet.size() +
+ " logs in the list: " + hlogSet);
+ for (String hlog : hlogSet) {
+ this.zkHelper.removeLogFromList(hlog.toString(), id);
+ }
+ hlogSet.clear();
+ }
+ }
+ }
+
+ /**
+ * Adds a normal source per registered peer cluster and tries to process all
+ * old region server hlog queues
+ */
+ public void init() throws IOException {
+ for (String id : this.zkHelper.getPeerClusters().keySet()) {
+ ReplicationSourceInterface src = addSource(id);
+ src.startup();
+ }
+ List currentReplicators = this.zkHelper.getListOfReplicators(null);
+ synchronized (otherRegionServers) {
+ LOG.info("Current list of replicators: " + currentReplicators
+ + " other RSs: " + otherRegionServers);
+ }
+ // Look if there's anything to process after a restart
+ for (String rs : currentReplicators) {
+ synchronized (otherRegionServers) {
+ if (!this.otherRegionServers.contains(rs)) {
+ transferQueues(rs);
+ }
+ }
+ }
+ }
+
+ /**
+ * Add a new normal source to this region server
+ * @param id the id of the peer cluster
+ * @return the created source
+ * @throws IOException
+ */
+ public ReplicationSourceInterface addSource(String id) throws IOException {
+ ReplicationSourceInterface src =
+ getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+ this.sources.add(src);
+ synchronized (this.hlogs) {
+ if (this.hlogs.size() > 0) {
+ this.zkHelper.addLogToList(this.hlogs.first(),
+ this.sources.get(0).getPeerClusterZnode());
+ src.enqueueLog(this.latestPath);
+ }
+ }
+ return src;
+ }
+
+ /**
+ * Terminate the replication on this region server
+ */
+ public void join() {
+ if (this.sources.size() == 0) {
+ this.zkHelper.deleteOwnRSZNode();
+ }
+ for (ReplicationSourceInterface source : this.sources) {
+ source.terminate();
+ }
+ }
+
+ /**
+ * Get a copy of the hlogs of the first source on this rs
+ * @return a sorted set of hlog names
+ */
+ protected SortedSet getHLogs() {
+ return new TreeSet(this.hlogs);
+ }
+
+ /**
+ * Get a list of all the normal sources of this rs
+ * @return lis of all sources
+ */
+ public List getSources() {
+ return this.sources;
+ }
+
+ @Override
+ public void logRolled(Path newLog) {
+ if (this.sources.size() > 0) {
+ this.zkHelper.addLogToList(newLog.getName(),
+ this.sources.get(0).getPeerClusterZnode());
+ }
+ synchronized (this.hlogs) {
+ this.hlogs.add(newLog.getName());
+ }
+ this.latestPath = newLog;
+ for (ReplicationSourceInterface source : this.sources) {
+ source.enqueueLog(newLog);
+ }
+ }
+
+ @Override
+ public void logArchived(Path oldPath, Path newPath) {
+ for (ReplicationSourceInterface source : this.sources) {
+ source.logArchived(oldPath, newPath);
+ }
+ }
+
+ /**
+ * Get the ZK help of this manager
+ * @return the helper
+ */
+ public ReplicationZookeeperWrapper getRepZkWrapper() {
+ return zkHelper;
+ }
+
+ /**
+ * Factory method to create a replication source
+ * @param conf the configuration to use
+ * @param fs the file system to use
+ * @param manager the manager to use
+ * @param stopper the stopper object for this region server
+ * @param replicating the status of the replication on this cluster
+ * @param peerClusterId the id of the peer cluster
+ * @return the created source
+ * @throws IOException
+ */
+ public ReplicationSourceInterface getReplicationSource(
+ final Configuration conf,
+ final FileSystem fs,
+ final ReplicationSourceManager manager,
+ final AtomicBoolean stopper,
+ final AtomicBoolean replicating,
+ final String peerClusterId) throws IOException {
+ ReplicationSourceInterface src;
+ try {
+ Class c = Class.forName(conf.get("replication.replicationsource.implementation",
+ ReplicationSource.class.getCanonicalName()));
+ src = (ReplicationSourceInterface) c.newInstance();
+ } catch (Exception e) {
+ LOG.warn("Passed replication source implemention throws errors, " +
+ "defaulting to ReplicationSource", e);
+ src = new ReplicationSource();
+
+ }
+ src.init(conf, fs, manager, stopper, replicating, peerClusterId);
+ return src;
+ }
+
+ /**
+ * Transfer all the queues of the specified to this region server.
+ * First it tries to grab a lock and if it works it will move the
+ * znodes and finally will delete the old znodes.
+ *
+ * It creates one old source for any type of source of the old rs.
+ * @param rsZnode
+ */
+ public void transferQueues(String rsZnode) {
+ // We try to lock that rs' queue directory
+ if (this.stopper.get()) {
+ LOG.info("Not transferring queue since we are shutting down");
+ return;
+ }
+ if (!this.zkHelper.lockOtherRS(rsZnode)) {
+ return;
+ }
+ LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
+ SortedMap> newQueues =
+ this.zkHelper.copyQueuesFromRS(rsZnode);
+ if (newQueues == null || newQueues.size() == 0) {
+ return;
+ }
+ this.zkHelper.deleteRsQueues(rsZnode);
+
+ for (Map.Entry> entry : newQueues.entrySet()) {
+ String peerId = entry.getKey();
+ try {
+ ReplicationSourceInterface src = getReplicationSource(this.conf,
+ this.fs, this, this.stopper, this.replicating, peerId);
+ this.oldsources.add(src);
+ for (String hlog : entry.getValue()) {
+ src.enqueueLog(new Path(this.oldLogDir, hlog));
+ }
+ src.startup();
+ } catch (IOException e) {
+ // TODO manage it
+ LOG.error("Failed creating a source", e);
+ }
+ }
+ }
+
+ /**
+ * Clear the references to the specified old source
+ * @param src source to clear
+ */
+ public void closeRecoveredQueue(ReplicationSourceInterface src) {
+ LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+ this.oldsources.remove(src);
+ this.zkHelper.deleteSource(src.getPeerClusterZnode());
+ }
+
+ /**
+ * Watcher used to be notified of the other region server's death
+ * in the local cluster. It initiates the process to transfer the queues
+ * if it is able to grab the lock.
+ */
+ public class OtherRegionServerWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info(" event " + watchedEvent);
+ if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+ watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+ return;
+ }
+
+ List newRsList = (zkHelper.getRegisteredRegionServers(this));
+ if (newRsList == null) {
+ return;
+ } else {
+ synchronized (otherRegionServers) {
+ otherRegionServers.clear();
+ otherRegionServers.addAll(newRsList);
+ }
+ }
+ if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+ LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
+ String[] rsZnodeParts = watchedEvent.getPath().split("/");
+ transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
index b26c071839e..69ab92a5d3f 100644
--- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
+++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
@@ -467,6 +467,7 @@ public class ZooKeeperWrapper implements Watcher {
private HServerAddress readAddress(String znode, Watcher watcher) {
try {
+ LOG.debug("<" + instanceName + ">" + "Trying to read " + znode);
return readAddressOrThrow(znode, watcher);
} catch (IOException e) {
LOG.debug("<" + instanceName + ">" + "Failed to read " + e.getMessage());
@@ -572,7 +573,7 @@ public class ZooKeeperWrapper implements Watcher {
if (recursive) {
LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode);
List znodes = this.zooKeeper.getChildren(znode, false);
- if (znodes.size() > 0) {
+ if (znodes != null && znodes.size() > 0) {
for (String child : znodes) {
String childFullPath = getZNode(znode, child);
LOG.info("<" + instanceName + ">" + "deleteZNode recursive call " + childFullPath);
@@ -914,10 +915,10 @@ public class ZooKeeperWrapper implements Watcher {
if (failOnWrite || stat == null) {
this.zooKeeper.create(path, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- LOG.debug("<" + instanceName + ">" + "Created " + path);
+ LOG.debug("<" + instanceName + ">" + "Created " + path + " with data " + strData);
} else {
this.zooKeeper.setData(path, data, -1);
- LOG.debug("<" + instanceName + ">" + "Updated " + path);
+ LOG.debug("<" + instanceName + ">" + "Updated " + path + " with data " + strData);
}
}
diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index d865e385276..6a588814c9c 100644
--- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -466,6 +466,7 @@ public class HBaseTestingUtility {
* @throws IOException
*/
public int loadTable(final HTable t, final byte[] f) throws IOException {
+ t.setAutoFlush(false);
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
@@ -481,6 +482,7 @@ public class HBaseTestingUtility {
}
}
}
+ t.flushCommits();
return rowCount;
}
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index e96637fc43b..211162e37f4 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -525,4 +525,47 @@ public class TestHLog extends HBaseTestCase {
}
}
}
+
+ /**
+ * Test that we can visit entries before they are appended
+ * @throws Exception
+ */
+ public void testVisitors() throws Exception {
+ final int COL_COUNT = 10;
+ final byte [] tableName = Bytes.toBytes("tablename");
+ final byte [] row = Bytes.toBytes("row");
+ this.conf.setBoolean("dfs.support.append", true);
+ HLog log = new HLog(this.fs, dir, this.oldLogDir, this.conf, null);
+ DumbLogEntriesVisitor visitor = new DumbLogEntriesVisitor();
+ log.addLogEntryVisitor(visitor);
+ long timestamp = System.currentTimeMillis();
+ HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ for (int i = 0; i < COL_COUNT; i++) {
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[]{(byte) (i + '0')}));
+ log.append(hri, tableName, cols, System.currentTimeMillis());
+ }
+ assertEquals(COL_COUNT, visitor.increments);
+ log.removeLogEntryVisitor(visitor);
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(11)),
+ timestamp, new byte[]{(byte) (11 + '0')}));
+ log.append(hri, tableName, cols, System.currentTimeMillis());
+ assertEquals(COL_COUNT, visitor.increments);
+ }
+
+ static class DumbLogEntriesVisitor implements LogEntryVisitor {
+
+ int increments = 0;
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) {
+ increments++;
+ }
+ }
}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
new file mode 100644
index 00000000000..ad9e8fbab67
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Source that does nothing at all, helpful to test ReplicationSourceManager
+ */
+public class ReplicationSourceDummy implements ReplicationSourceInterface {
+
+ ReplicationSourceManager manager;
+ String peerClusterId;
+ Path currentPath;
+
+ @Override
+ public void init(Configuration conf, FileSystem fs,
+ ReplicationSourceManager manager, AtomicBoolean stopper,
+ AtomicBoolean replicating, String peerClusterId)
+ throws IOException {
+ this.manager = manager;
+ this.peerClusterId = peerClusterId;
+ }
+
+ @Override
+ public void enqueueLog(Path log) {
+ this.currentPath = log;
+ }
+
+ @Override
+ public Path getCurrentPath() {
+ return this.currentPath;
+ }
+
+ @Override
+ public void startup() {
+
+ }
+
+ @Override
+ public void terminate() {
+
+ }
+
+ @Override
+ public String getPeerClusterZnode() {
+ return peerClusterId;
+ }
+
+ @Override
+ public void logArchived(Path oldPath, Path newPath) {
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
new file mode 100644
index 00000000000..163671f430c
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -0,0 +1,478 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.EmptyWatcher;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestReplication {
+
+ private static final Log LOG = LogFactory.getLog(TestReplication.class);
+
+ private static Configuration conf1;
+ private static Configuration conf2;
+
+ private static ZooKeeperWrapper zkw1;
+ private static ZooKeeperWrapper zkw2;
+
+ private static HTable htable1;
+ private static HTable htable2;
+
+ private static HBaseTestingUtility utility1;
+ private static HBaseTestingUtility utility2;
+ private static final int NB_ROWS_IN_BATCH = 100;
+ private static final long SLEEP_TIME = 500;
+ private static final int NB_RETRIES = 10;
+
+ private static final byte[] tableName = Bytes.toBytes("test");
+ private static final byte[] famName = Bytes.toBytes("f");
+ private static final byte[] row = Bytes.toBytes("row");
+ private static final byte[] noRepfamName = Bytes.toBytes("norep");
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1 = HBaseConfiguration.create();
+ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ // smaller block size and capacity to trigger more operations
+ // and test them
+ conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
+ conf1.setInt("replication.source.nb.capacity", 5);
+ conf1.setLong("replication.source.sleepforretries", 100);
+ conf1.setInt("hbase.regionserver.maxlogs", 10);
+ conf1.setLong("hbase.master.logcleaner.ttl", 10);
+ conf1.setLong("hbase.client.retries.number", 4);
+ conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ conf1.setBoolean("dfs.support.append", true);
+ conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
+ zkw1.writeZNode("/1", "replication", "");
+ zkw1.writeZNode("/1/replication", "master",
+ conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+ setIsReplication(true);
+
+ LOG.info("Setup first Zk");
+
+ conf2 = HBaseConfiguration.create();
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ conf2.setInt("hbase.client.retries.number", 6);
+ conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ conf2.setBoolean("dfs.support.append", true);
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
+ zkw2.writeZNode("/2", "replication", "");
+ zkw2.writeZNode("/2/replication", "master",
+ conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf1.get("hbase.zookeeper.property.clientPort")+":/1");
+
+ zkw1.writeZNode("/1/replication/peers", "1",
+ conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+
+ LOG.info("Setup second Zk");
+
+ utility1.startMiniCluster(2);
+ utility2.startMiniCluster(2);
+
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ table.setDeferredLogFlush(false);
+ HColumnDescriptor fam = new HColumnDescriptor(famName);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ table.addFamily(fam);
+ fam = new HColumnDescriptor(noRepfamName);
+ table.addFamily(fam);
+ HBaseAdmin admin1 = new HBaseAdmin(conf1);
+ HBaseAdmin admin2 = new HBaseAdmin(conf2);
+ admin1.createTable(table);
+ admin2.createTable(table);
+
+ htable1 = new HTable(conf1, tableName);
+ htable1.setWriteBufferSize(1024*5);
+ htable2 = new HTable(conf2, tableName);
+ }
+
+ private static void setIsReplication(boolean rep) throws Exception {
+ LOG.info("Set rep " + rep);
+ zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
+ // Takes some ms for ZK to fire the watcher
+ Thread.sleep(SLEEP_TIME);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ setIsReplication(false);
+ utility1.truncateTable(tableName);
+ utility2.truncateTable(tableName);
+ // If test is flaky, set that sleep higher
+ Thread.sleep(SLEEP_TIME*8);
+ setIsReplication(true);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ utility2.shutdownMiniCluster();
+ utility1.shutdownMiniCluster();
+ }
+
+ /**
+ * Add a row, check it's replicated, delete it, check's gone
+ * @throws Exception
+ */
+ @Test
+ public void testSimplePutDelete() throws Exception {
+ LOG.info("testSimplePutDelete");
+ Put put = new Put(row);
+ put.add(famName, row, row);
+
+ htable1 = new HTable(conf1, tableName);
+ htable1.put(put);
+
+ HTable table2 = new HTable(conf2, tableName);
+ Get get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = table2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ Delete del = new Delete(row);
+ htable1.delete(del);
+
+ table2 = new HTable(conf2, tableName);
+ get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for del replication");
+ }
+ Result res = table2.get(get);
+ if (res.size() >= 1) {
+ LOG.info("Row not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Try a small batch upload using the write buffer, check it's replicated
+ * @throws Exception
+ */
+ @Test
+ public void testSmallBatch() throws Exception {
+ LOG.info("testSmallBatch");
+ Put put;
+ // normal Batch tests
+ htable1.setAutoFlush(false);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ put = new Put(Bytes.toBytes(i));
+ put.add(famName, row, row);
+ htable1.put(put);
+ }
+ htable1.flushCommits();
+
+ Scan scan = new Scan();
+
+ ResultScanner scanner1 = htable1.getScanner(scan);
+ Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
+ scanner1.close();
+ assertEquals(NB_ROWS_IN_BATCH, res1.length);
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+ scanner.close();
+ if (res.length != NB_ROWS_IN_BATCH) {
+ LOG.info("Only got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+
+ htable1.setAutoFlush(true);
+
+ }
+
+ /**
+ * Test stopping replication, trying to insert, make sure nothing's
+ * replicated, enable it, try replicating and it should work
+ * @throws Exception
+ */
+ @Test
+ public void testStartStop() throws Exception {
+
+ // Test stopping replication
+ setIsReplication(false);
+
+ Put put = new Put(Bytes.toBytes("stop start"));
+ put.add(famName, row, row);
+ htable1.put(put);
+
+ Get get = new Get(Bytes.toBytes("stop start"));
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ break;
+ }
+ Result res = htable2.get(get);
+ if(res.size() >= 1) {
+ fail("Replication wasn't stopped");
+
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ // Test restart replication
+ setIsReplication(true);
+
+ htable1.put(put);
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if(res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ put = new Put(Bytes.toBytes("do not rep"));
+ put.add(noRepfamName, row, row);
+ htable1.put(put);
+
+ get = new Get(Bytes.toBytes("do not rep"));
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES-1) {
+ break;
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Not supposed to be replicated");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ }
+
+ /**
+ * Do a more intense version testSmallBatch, one that will trigger
+ * hlog rolling and other non-trivial code paths
+ * @throws Exception
+ */
+ @Test
+ public void loadTesting() throws Exception {
+ htable1.setWriteBufferSize(1024);
+ htable1.setAutoFlush(false);
+ for (int i = 0; i < NB_ROWS_IN_BATCH *10; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.add(famName, row, row);
+ htable1.put(put);
+ }
+ htable1.flushCommits();
+
+ Scan scan = new Scan();
+
+ ResultScanner scanner = htable1.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH * 100);
+ scanner.close();
+
+ assertEquals(NB_ROWS_IN_BATCH *10, res.length);
+
+ scan = new Scan();
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+
+ scanner = htable2.getScanner(scan);
+ res = scanner.next(NB_ROWS_IN_BATCH * 100);
+ scanner.close();
+ if (res.length != NB_ROWS_IN_BATCH *10) {
+ if (i == NB_RETRIES-1) {
+ int lastRow = -1;
+ for (Result result : res) {
+ int currentRow = Bytes.toInt(result.getRow());
+ for (int row = lastRow+1; row < currentRow; row++) {
+ LOG.error("Row missing: " + row);
+ }
+ lastRow = currentRow;
+ }
+ LOG.error("Last row: " + lastRow);
+ fail("Waited too much time for normal batch replication, "
+ + res.length + " instead of " + NB_ROWS_IN_BATCH *10);
+ } else {
+ LOG.info("Only got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Load up multiple tables over 2 region servers and kill a source during
+ * the upload. The failover happens internally.
+ * @throws Exception
+ */
+ @Test
+ public void queueFailover() throws Exception {
+ utility1.createMultiRegions(htable1, famName);
+
+ // killing the RS with .META. can result into failed puts until we solve
+ // IO fencing
+ int rsToKill1 =
+ utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
+ int rsToKill2 =
+ utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
+
+ // Takes about 20 secs to run the full loading, kill around the middle
+ Thread killer1 = killARegionServer(utility1, 7500, rsToKill1);
+ Thread killer2 = killARegionServer(utility2, 10000, rsToKill2);
+
+ LOG.info("Start loading table");
+ int initialCount = utility1.loadTable(htable1, famName);
+ LOG.info("Done loading table");
+ killer1.join(5000);
+ killer2.join(5000);
+ LOG.info("Done waiting for threads");
+
+ Result[] res;
+ while (true) {
+ try {
+ Scan scan = new Scan();
+ ResultScanner scanner = htable1.getScanner(scan);
+ res = scanner.next(initialCount);
+ scanner.close();
+ break;
+ } catch (UnknownScannerException ex) {
+ LOG.info("Cluster wasn't ready yet, restarting scanner");
+ }
+ }
+ // Test we actually have all the rows, we may miss some because we
+ // don't have IO fencing.
+ if (res.length != initialCount) {
+ LOG.warn("We lost some rows on the master cluster!");
+ // We don't really expect the other cluster to have more rows
+ initialCount = res.length;
+ }
+
+ Scan scan2 = new Scan();
+
+ int lastCount = 0;
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for queueFailover replication");
+ }
+ ResultScanner scanner2 = htable2.getScanner(scan2);
+ Result[] res2 = scanner2.next(initialCount * 2);
+ scanner2.close();
+ if (res2.length < initialCount) {
+ if (lastCount < res2.length) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res2.length;
+ LOG.info("Only got " + lastCount + " rows instead of " +
+ initialCount + " current i=" + i);
+ Thread.sleep(SLEEP_TIME*2);
+ } else {
+ break;
+ }
+ }
+ }
+
+ private static Thread killARegionServer(final HBaseTestingUtility utility,
+ final long timeout, final int rs) {
+ Thread killer = new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(timeout);
+ utility.expireRegionServerSession(rs);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+ };
+ killer.start();
+ return killer;
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
new file mode 100644
index 00000000000..5e967f34d64
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestReplicationSource {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationSource.class);
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private static FileSystem fs;
+ private static Path oldLogDir;
+ private static Path logDir;
+ private static Configuration conf = HBaseConfiguration.create();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniDFSCluster(1);
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ oldLogDir = new Path(fs.getHomeDirectory(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(fs.getHomeDirectory(),
+ HConstants.HREGION_LOGDIR_NAME);
+ }
+
+ /**
+ * Sanity check that we can move logs around while we are reading
+ * from them. Should this test fail, ReplicationSource would have a hard
+ * time reading logs that are being archived.
+ * @throws Exception
+ */
+ @Test
+ public void testLogMoving() throws Exception{
+ Path logPath = new Path(logDir, "log");
+ HLog.Writer writer = HLog.createWriter(fs, logPath, conf);
+ for(int i = 0; i < 3; i++) {
+ byte[] b = Bytes.toBytes(Integer.toString(i));
+ KeyValue kv = new KeyValue(b,b,b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ HLogKey key = new HLogKey(b, b, 0, 0);
+ writer.append(new HLog.Entry(key, edit));
+ writer.sync();
+ }
+ writer.close();
+
+ HLog.Reader reader = HLog.getReader(fs, logPath, conf);
+ HLog.Entry entry = reader.next();
+ assertNotNull(entry);
+
+ Path oldLogPath = new Path(oldLogDir, "log");
+ fs.rename(logPath, oldLogPath);
+
+ entry = reader.next();
+ assertNotNull(entry);
+
+ entry = reader.next();
+ entry = reader.next();
+
+ assertNull(entry);
+
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
new file mode 100644
index 00000000000..dc4f458c8d4
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestReplicationSink {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationSink.class);
+
+ private static final int BATCH_SIZE = 10;
+
+ private static final long SLEEP_TIME = 500;
+
+ private final static Configuration conf = HBaseConfiguration.create();
+
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private static ReplicationSink SINK;
+
+ private static final byte[] TABLE_NAME1 =
+ Bytes.toBytes("table1");
+ private static final byte[] TABLE_NAME2 =
+ Bytes.toBytes("table2");
+
+ private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+ private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+
+ private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+ private static HTable table1;
+
+ private static HTable table2;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
+ TEST_UTIL.getConfiguration().setBoolean(
+ HConstants.REPLICATION_ENABLE_KEY, true);
+ TEST_UTIL.startMiniCluster(3);
+ conf.setBoolean("dfs.support.append", true);
+ SINK = new ReplicationSink(conf,STOPPER);
+ table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
+ table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ STOPPER.set(true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
+ table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
+ Thread.sleep(SLEEP_TIME);
+ }
+
+ /**
+ * Insert a whole batch of entries
+ * @throws Exception
+ */
+ @Test
+ public void testBatchSink() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
+ }
+
+ /**
+ * Insert a mix of puts and deletes
+ * @throws Exception
+ */
+ @Test
+ public void testMixedPutDelete() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
+ for(int i = 0; i < BATCH_SIZE/2; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+
+ entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i,
+ i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
+ }
+
+ SINK.replicateEntries(entries);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
+ }
+
+ /**
+ * Insert to 2 different tables
+ * @throws Exception
+ */
+ @Test
+ public void testMixedPutTables() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries[i] =
+ createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
+ i, KeyValue.Type.Put);
+ }
+
+ SINK.replicateEntries(entries);
+ Scan scan = new Scan();
+ ResultScanner scanRes = table2.getScanner(scan);
+ for(Result res : scanRes) {
+ assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+ }
+ }
+
+ /**
+ * Insert then do different types of deletes
+ * @throws Exception
+ */
+ @Test
+ public void testMixedDeletes() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[3];
+ for(int i = 0; i < 3; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ entries = new HLog.Entry[3];
+
+ entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
+ entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+ entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
+
+ SINK.replicateEntries(entries);
+
+ Scan scan = new Scan();
+ ResultScanner scanRes = table1.getScanner(scan);
+ assertEquals(0, scanRes.next(3).length);
+ }
+
+ /**
+ * Puts are buffered, but this tests when a delete (not-buffered) is applied
+ * before the actual Put that creates it.
+ * @throws Exception
+ */
+ @Test
+ public void testApplyDeleteBeforePut() throws Exception {
+ HLog.Entry[] entries = new HLog.Entry[5];
+ for(int i = 0; i < 2; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ entries[2] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
+ for(int i = 3; i < 5; i++) {
+ entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
+ }
+ SINK.replicateEntries(entries);
+ Get get = new Get(Bytes.toBytes(1));
+ Result res = table1.get(get);
+ assertEquals(0, res.size());
+ }
+
+ private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
+ byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
+ byte[] rowBytes = Bytes.toBytes(row);
+ // Just make sure we don't get the same ts for two consecutive rows with
+ // same key
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ LOG.info("Was interrupted while sleep, meh", e);
+ }
+ final long now = System.currentTimeMillis();
+ KeyValue kv = null;
+ if(type.getCode() == KeyValue.Type.Put.getCode()) {
+ kv = new KeyValue(rowBytes, fam, fam, now,
+ KeyValue.Type.Put, Bytes.toBytes(row));
+ } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
+ kv = new KeyValue(rowBytes, fam, fam,
+ now, KeyValue.Type.DeleteColumn);
+ } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
+ kv = new KeyValue(rowBytes, fam, null,
+ now, KeyValue.Type.DeleteFamily);
+ }
+
+ HLogKey key = new HLogKey(table, table, now, now);
+
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+
+ return new HLog.Entry(key, edit);
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
new file mode 100644
index 00000000000..bb09bc3a355
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URLEncoder;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestReplicationSourceManager {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationSourceManager.class);
+
+ private static Configuration conf;
+
+ private static HBaseTestingUtility utility;
+
+ private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
+
+ private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
+
+ private static ReplicationSourceManager manager;
+
+ private static ZooKeeperWrapper zkw;
+
+ private static HTableDescriptor htd;
+
+ private static HRegionInfo hri;
+
+ private static final byte[] r1 = Bytes.toBytes("r1");
+
+ private static final byte[] r2 = Bytes.toBytes("r2");
+
+ private static final byte[] f1 = Bytes.toBytes("f1");
+
+ private static final byte[] f2 = Bytes.toBytes("f2");
+
+ private static final byte[] test = Bytes.toBytes("test");
+
+ private static FileSystem fs;
+
+ private static Path oldLogDir;
+
+ private static Path logDir;
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ conf = HBaseConfiguration.create();
+ conf.set("replication.replicationsource.implementation",
+ ReplicationSourceDummy.class.getCanonicalName());
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ utility = new HBaseTestingUtility(conf);
+ utility.startMiniZKCluster();
+
+ zkw = ZooKeeperWrapper.createInstance(conf, "test");
+ zkw.writeZNode("/hbase", "replication", "");
+ zkw.writeZNode("/hbase/replication", "master",
+ conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf.get("hbase.zookeeper.property.clientPort")+":/1");
+ zkw.writeZNode("/hbase/replication/peers", "1",
+ conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf.get("hbase.zookeeper.property.clientPort")+":/1");
+
+ HRegionServer server = new HRegionServer(conf);
+ ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
+ server.getZooKeeperWrapper(), conf,
+ REPLICATING, "123456789");
+ fs = FileSystem.get(conf);
+ oldLogDir = new Path(utility.getTestDir(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(utility.getTestDir(),
+ HConstants.HREGION_LOGDIR_NAME);
+
+ manager = new ReplicationSourceManager(helper,
+ conf, STOPPER, fs, REPLICATING, oldLogDir);
+ manager.addSource("1");
+
+ htd = new HTableDescriptor(test);
+ HColumnDescriptor col = new HColumnDescriptor("f1");
+ col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ htd.addFamily(col);
+ col = new HColumnDescriptor("f2");
+ col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+ htd.addFamily(col);
+
+ hri = new HRegionInfo(htd, r1, r2);
+
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ manager.join();
+ utility.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fs.delete(logDir, true);
+ fs.delete(oldLogDir, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ setUp();
+ }
+
+ @Test
+ public void testLogRoll() throws Exception {
+ long seq = 0;
+ long baseline = 1000;
+ long time = baseline;
+ KeyValue kv = new KeyValue(r1, f1, r1);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+
+ HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
+ URLEncoder.encode("regionserver:60020", "UTF8"));
+
+ manager.init();
+
+ // Testing normal log rolling every 20
+ for(long i = 1; i < 101; i++) {
+ if(i > 1 && i % 20 == 0) {
+ hlog.rollWriter();
+ }
+ LOG.info(i);
+ HLogKey key = new HLogKey(hri.getRegionName(),
+ test, seq++, System.currentTimeMillis());
+ hlog.append(hri, key, edit);
+ }
+
+ // Simulate a rapid insert that's followed
+ // by a report that's still not totally complete (missing last one)
+ LOG.info(baseline + " and " + time);
+ baseline += 101;
+ time = baseline;
+ LOG.info(baseline + " and " + time);
+
+ for (int i = 0; i < 3; i++) {
+ HLogKey key = new HLogKey(hri.getRegionName(),
+ test, seq++, System.currentTimeMillis());
+ hlog.append(hri, key, edit);
+ }
+
+ assertEquals(6, manager.getHLogs().size());
+
+ hlog.rollWriter();
+
+ manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
+ "1", 0, false);
+
+ HLogKey key = new HLogKey(hri.getRegionName(),
+ test, seq++, System.currentTimeMillis());
+ hlog.append(hri, key, edit);
+
+ assertEquals(1, manager.getHLogs().size());
+
+
+ // TODO Need a case with only 2 HLogs and we only want to delete the first one
+ }
+
+}