HBASE-2223 Handle 10min+ network partitions between clusters

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@959479 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-07-01 00:25:50 +00:00
parent 35616d9205
commit 03933720fa
29 changed files with 3630 additions and 40 deletions

View File

@ -793,6 +793,7 @@ Release 0.21.0 - Unreleased
(Jeff Hammerbacher via Ryan Rawson)
HBASE-7 Provide a HBase checker and repair tool similar to fsck
(dhruba borthakur via Stack)
HBASE-2223 Handle 10min+ network partitions between clusters
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -0,0 +1,75 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script to add a peer to a cluster
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "add_peer"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
c2 = HBaseConfiguration.create()
parts2 = ARGV[1].split(":")
c1 = HBaseConfiguration.create()
c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
zkw1 = ZooKeeperWrapper.createInstance(c1, "ZK1")
zkw1.writeZNode(parts1[2], "replication", "a")
zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
zkw2 = ZooKeeperWrapper.createInstance(c2, "ZK2")
zkw2.writeZNode(parts2[2], "replication", "a")
zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);
puts "Peer successfully added"

View File

@ -0,0 +1,75 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script to recreate all tables from one cluster to another
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "copy_tables_desc"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
parts2 = ARGV[1].split(":")
c1 = HBaseConfiguration.create()
c1.set(HConstants::ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts1[2])
admin1 = HBaseAdmin.new(c1)
c2 = HBaseConfiguration.create()
c2.set(HConstants::ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants::ZOOKEEPER_ZNODE_PARENT, parts2[2])
admin2 = HBaseAdmin.new(c2)
for t in admin1.listTables()
admin2.createTable(t)
end
puts "All descriptions were copied"

View File

@ -444,7 +444,7 @@
<compileSource>1.6</compileSource>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>0.21.0-SNAPSHOT</hbase.version>
<hadoop.version>0.20.3-append-r956776+1240</hadoop.version>
<hadoop.version>0.20.3-append-r956776+1240+tail</hadoop.version>
<commons-cli.version>1.2</commons-cli.version>
<commons-logging.version>1.1.1</commons-logging.version>

View File

@ -345,6 +345,9 @@ public final class HConstants {
*/
public static long DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD = 60000;
public static final String
REPLICATION_ENABLE_KEY = "hbase.replication";
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import java.io.IOException;
import java.util.List;
@ -285,4 +286,16 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
*/
public void bulkLoadHFile(String hfilePath,
byte[] regionName, byte[] familyName) throws IOException;
/**
* Replicates the given entries. The guarantee is that the given entries
* will be durable on the slave cluster if this method returns without
* any exception.
* hbase.replication has to be set to true for this to work.
*
* @param entries entries to replicate
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
}

View File

@ -1023,6 +1023,7 @@ public class HMaster extends Thread implements HMasterInterface,
byte [] rowKey = ((ImmutableBytesWritable)args[0]).get();
pair = getTableRegionForRow(tableName, rowKey);
}
LOG.info("About to " + op.toString() + " on " + Bytes.toString(tableName) + " and pair is " + pair);
if (pair != null && pair.getSecond() != null) {
this.regionManager.startAction(pair.getFirst().getRegionName(),
pair.getFirst(), pair.getSecond(), op);

View File

@ -27,7 +27,9 @@ import org.apache.hadoop.fs.Path;
* Interface for the log cleaning function inside the master. Only 1 is called
* so if the desired effect is the mix of many cleaners, do call them yourself
* in order to control the flow.
* HBase ships with OldLogsCleaner as the default implementation
* HBase ships with OldLogsCleaner as the default implementation.
* This interface extends Configurable, so setConf needs to be called once
* before using the cleaner.
*/
public interface LogCleanerDelegate extends Configurable {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -62,6 +63,13 @@ public class OldLogsCleaner extends Chore {
Configuration conf, FileSystem fs,
Path oldLogDir) {
super("OldLogsCleaner", p, s);
// Use the log cleaner provided by replication if enabled, unless something
// was already provided
if (conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false) &&
conf.get("hbase.master.logcleanerplugin.impl") == null) {
conf.set("hbase.master.logcleanerplugin.impl",
"org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner");
}
this.maxDeletedLogs =
conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
this.fs = fs;

View File

@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
@ -236,6 +237,10 @@ public class HRegionServer implements HRegionInterface,
private final String machineName;
// Replication-related attributes
private Replication replicationHandler;
// End of replication
/**
* Starts a HRegionServer at the default location
* @param conf
@ -913,15 +918,18 @@ public class HRegionServer implements HRegionInterface,
"running at " + this.serverInfo.getServerName() +
" because logdir " + logdir.toString() + " exists");
}
HLog newlog = instantiateHLog(logdir, oldLogDir);
return newlog;
this.replicationHandler = new Replication(this.conf,this.serverInfo,
this.fs, oldLogDir, stopRequested);
HLog log = instantiateHLog(logdir, oldLogDir);
this.replicationHandler.addLogEntryVisitor(log);
return log;
}
// instantiate
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
serverInfo.getServerAddress().toString());
return newlog;
return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
this.replicationHandler.getReplicationManager(),
this.serverInfo.getServerAddress().toString());
}
@ -1052,6 +1060,8 @@ public class HRegionServer implements HRegionInterface,
}
}
this.replicationHandler.startReplicationServices();
// Start Server. This service is like leases in that it internally runs
// a thread.
this.server.start();
@ -1140,7 +1150,7 @@ public class HRegionServer implements HRegionInterface,
this.abortRequested = true;
this.reservedSpace.clear();
if (this.metrics != null) {
LOG.info("Dump of metrics: " + this.metrics.toString());
LOG.info("Dump of metrics: " + this.metrics);
}
stop();
}
@ -1172,6 +1182,7 @@ public class HRegionServer implements HRegionInterface,
Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.compactSplitThread);
Threads.shutdown(this.hlogRoller);
this.replicationHandler.join();
}
private boolean getMaster() {
@ -2444,6 +2455,11 @@ public class HRegionServer implements HRegionInterface,
}
}
@Override
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
this.replicationHandler.replicateLogEntries(entries);
}
/**
* Do class main.
* @param args

View File

@ -36,11 +36,13 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -117,6 +119,7 @@ import com.google.common.util.concurrent.NamingThreadFactory;
*/
public class HLog implements Syncable {
static final Log LOG = LogFactory.getLog(HLog.class);
private static final String HLOG_DATFILE = "hlog.dat.";
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
private final FileSystem fs;
@ -219,6 +222,9 @@ public class HLog implements Syncable {
*/
private final LogSyncer logSyncerThread;
private final List<LogEntryVisitor> logEntryVisitors =
new CopyOnWriteArrayList<LogEntryVisitor>();
/**
* Pattern used to validate a HLog file name
*/
@ -1028,6 +1034,11 @@ public class HLog implements Syncable {
if (!this.enabled) {
return;
}
if (!this.logEntryVisitors.isEmpty()) {
for (LogEntryVisitor visitor : this.logEntryVisitors) {
visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
}
}
try {
long now = System.currentTimeMillis();
this.writer.append(new HLog.Entry(logKey, logEdit));
@ -1179,8 +1190,16 @@ public class HLog implements Syncable {
srcDir.toString());
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
LOG.info("Spliting is done. Removing old log dir "+srcDir);
fs.delete(srcDir, false);
FileStatus[] files = fs.listStatus(srcDir);
for(FileStatus file : files) {
Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
FSUtils.getPath(newPath));
fs.rename(file.getPath(), newPath);
}
LOG.debug("Moved " + files.length + " log files to " +
FSUtils.getPath(oldLogDir));
fs.delete(srcDir, true);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
@ -1632,11 +1651,21 @@ public class HLog implements Syncable {
return new Path(regionDir, RECOVERED_EDITS);
}
/**
*
* @param visitor
*/
public void addLogEntryVisitor(LogEntryVisitor visitor) {
this.logEntryVisitors.add(visitor);
}
/**
*
* @param visitor
*/
public void removeLogEntryVisitor(LogEntryVisitor visitor) {
this.logEntryVisitors.remove(visitor);
}
public void addLogActionsListerner(LogActionsListener list) {

View File

@ -46,7 +46,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
private long writeTime;
private byte clusterId;
private int scope;
/** Writable Consructor -- Do not use. */
public HLogKey() {
@ -70,7 +69,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
this.scope = HConstants.REPLICATION_SCOPE_LOCAL;
}
//////////////////////////////////////////////////////////////////////////////
@ -119,22 +117,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.clusterId = clusterId;
}
/**
* Get the replication scope of this key
* @return replication scope
*/
public int getScope() {
return this.scope;
}
/**
* Set the replication scope of this key
* @param scope The new scope
*/
public void setScope(int scope) {
this.scope = scope;
}
@Override
public String toString() {
return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@ -158,7 +140,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
result ^= this.logSeqNum;
result ^= this.writeTime;
result ^= this.clusterId;
result ^= this.scope;
return result;
}
@ -187,7 +168,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
out.writeByte(this.clusterId);
out.writeInt(this.scope);
}
public void readFields(DataInput in) throws IOException {
@ -197,7 +177,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.writeTime = in.readLong();
try {
this.clusterId = in.readByte();
this.scope = in.readInt();
} catch(EOFException e) {
// Means it's an old key, just continue
}

View File

@ -0,0 +1,15 @@
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.HRegionInfo;
public interface LogEntryVisitor {
/**
*
* @param info
* @param logKey
* @param logEdit
*/
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
WALEdit logEdit);
}

View File

@ -0,0 +1,493 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class serves as a helper for all things related to zookeeper
* in replication.
* <p/>
* The layout looks something like this under zookeeper.znode.parent
* for the master cluster:
* <p/>
* <pre>
* replication/
* master {contains a full cluster address}
* state {contains true or false}
* clusterId {contains a byte}
* peers/
* 1/ {contains a full cluster address}
* 2/
* ...
* rs/ {lists all RS that replicate}
* startcode1/ {lists all peer clusters}
* 1/ {lists hlogs to process}
* 10.10.1.76%3A53488.123456789 {contains nothing or a position}
* 10.10.1.76%3A53488.123456790
* ...
* 2/
* ...
* startcode2/
* ...
* </pre>
*/
public class ReplicationZookeeperWrapper {
private static final Log LOG =
LogFactory.getLog(ReplicationZookeeperWrapper.class);
// Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock";
// Our handle on zookeeper
private final ZooKeeperWrapper zookeeperWrapper;
// Map of addresses of peer clusters with their ZKW
private final Map<String, ZooKeeperWrapper> peerClusters;
// Path to the root replication znode
private final String replicationZNode;
// Path to the peer clusters znode
private final String peersZNode;
// Path to the znode that contains all RS that replicates
private final String rsZNode;
// Path to this region server's name under rsZNode
private final String rsServerNameZnode;
// Name node if the replicationState znode
private final String replicationStateNodeName;
// If this RS is part of a master cluster
private final boolean replicationMaster;
private final Configuration conf;
// Is this cluster replicating at the moment?
private final AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
private final String clusterId;
/**
* Constructor used by region servers, connects to the peer cluster right away.
*
* @param zookeeperWrapper zkw to wrap
* @param conf conf to use
* @param replicating atomic boolean to start/stop replication
* @param rsName the name of this region server, null if
* using RZH only to use the helping methods
* @throws IOException
*/
public ReplicationZookeeperWrapper(
ZooKeeperWrapper zookeeperWrapper, Configuration conf,
final AtomicBoolean replicating, String rsName) throws IOException {
this.zookeeperWrapper = zookeeperWrapper;
this.conf = conf;
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.replication.peers", "peers");
String repMasterZNodeName =
conf.get("zookeeper.znode.replication.master", "master");
this.replicationStateNodeName =
conf.get("zookeeper.znode.replication.state", "state");
String clusterIdZNodeName =
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
String rsZNodeName =
conf.get("zookeeper.znode.replication.rs", "rs");
String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
this.peerClusters = new HashMap<String, ZooKeeperWrapper>();
this.replicationZNode = zookeeperWrapper.getZNode(
zookeeperWrapper.getParentZNode(), replicationZNodeName);
this.peersZNode =
zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
this.rsZNode =
zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
this.replicating = replicating;
setReplicating();
String idResult = Bytes.toString(
this.zookeeperWrapper.getData(this.replicationZNode,
clusterIdZNodeName));
this.clusterId =
idResult == null ?
Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
String address = Bytes.toString(
this.zookeeperWrapper.getData(this.replicationZNode,
repMasterZNodeName));
this.replicationMaster = thisCluster.equals(address);
LOG.info("This cluster (" + thisCluster + ") is a "
+ (this.replicationMaster ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
if (rsName != null) {
this.rsServerNameZnode =
this.zookeeperWrapper.getZNode(rsZNode, rsName);
List<String> znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
new ReplicationStatusWatcher());
if (znodes != null) {
for (String znode : znodes) {
connectToPeer(znode);
}
}
} else {
this.rsServerNameZnode = null;
}
}
/**
* Returns all region servers from given peer
*
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
public List<HServerAddress> getPeersAddresses(String peerClusterId) {
if (this.peerClusters.size() == 0) {
return new ArrayList<HServerAddress>(0);
}
ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
return zkw == null?
new ArrayList<HServerAddress>(0) : zkw.scanRSDirectory();
}
/**
* This method connects this cluster to another one and registers it
* in this region server's replication znode
* @param peerId id of the peer cluster
*/
private void connectToPeer(String peerId) throws IOException {
String[] ensemble =
Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
split(":");
if (ensemble.length != 3) {
throw new IllegalArgumentException("Wrong format of cluster address: " +
this.zookeeperWrapper.getData(this.peersZNode, peerId));
}
Configuration otherConf = new Configuration(this.conf);
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
"connection to cluster: " + peerId);
zkw.registerListener(new ReplicationStatusWatcher());
this.peerClusters.put(peerId, zkw);
this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
this.rsServerNameZnode, peerId));
LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
private void setReplicating() {
String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
this.replicationZNode, this.replicationStateNodeName,
new ReplicationStatusWatcher()));
if (value != null) {
this.replicating.set(value.equals("true"));
LOG.info("Replication is now " + (this.replicating.get() ?
"started" : "stopped"));
}
}
/**
* Add a new log to the list of hlogs in zookeeper
* @param filename name of the hlog's znode
* @param clusterId name of the cluster's znode
*/
public void addLogToList(String filename, String clusterId) {
try {
this.zookeeperWrapper.writeZNode(
this.zookeeperWrapper.getZNode(
this.rsServerNameZnode, clusterId), filename, "");
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
LOG.error(e);
}
}
/**
* Remove a log from the list of hlogs in zookeeper
* @param filename name of the hlog's znode
* @param clusterId name of the cluster's znode
*/
public void removeLogFromList(String filename, String clusterId) {
try {
this.zookeeperWrapper.deleteZNode(
this.zookeeperWrapper.getZNode(this.rsServerNameZnode,
this.zookeeperWrapper.getZNode(clusterId, filename)));
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
LOG.error(e);
}
}
/**
* Set the current position of the specified cluster in the current hlog
* @param filename filename name of the hlog's znode
* @param clusterId clusterId name of the cluster's znode
* @param position the position in the file
* @throws IOException
*/
public void writeReplicationStatus(String filename, String clusterId,
long position) {
try {
String clusterZNode = this.zookeeperWrapper.getZNode(
this.rsServerNameZnode, clusterId);
this.zookeeperWrapper.writeZNode(clusterZNode, filename,
Long.toString(position));
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
LOG.error(e);
}
}
/**
* Get a list of all the other region servers in this cluster
* and set a watch
* @param watch the watch to set
* @return a list of server nanes
*/
public List<String> getRegisteredRegionServers(Watcher watch) {
return this.zookeeperWrapper.listZnodes(
this.zookeeperWrapper.getRsZNode(), watch);
}
/**
* Get the list of the replicators that have queues, they can be alive, dead
* or simply from a previous run
* @param watch the watche to set
* @return a list of server names
*/
public List<String> getListOfReplicators(Watcher watch) {
return this.zookeeperWrapper.listZnodes(rsZNode, watch);
}
/**
* Get the list of peer clusters for the specified server names
* @param rs server names of the rs
* @param watch the watch to set
* @return a list of peer cluster
*/
public List<String> getListPeersForRS(String rs, Watcher watch) {
return this.zookeeperWrapper.listZnodes(
zookeeperWrapper.getZNode(rsZNode, rs), watch);
}
/**
* Get the list of hlogs for the specified region server and peer cluster
* @param rs server names of the rs
* @param id peer cluster
* @param watch the watch to set
* @return a list of hlogs
*/
public List<String> getListHLogsForPeerForRS(String rs, String id, Watcher watch) {
return this.zookeeperWrapper.listZnodes(
zookeeperWrapper.getZNode(zookeeperWrapper.getZNode(rsZNode, rs), id), watch);
}
/**
* Try to set a lock in another server's znode.
* @param znode the server names of the other server
* @return true if the lock was acquired, false in every other cases
*/
public boolean lockOtherRS(String znode) {
try {
this.zookeeperWrapper.writeZNode(
this.zookeeperWrapper.getZNode(this.rsZNode, znode),
RS_LOCK_ZNODE, rsServerNameZnode, true);
} catch (InterruptedException e) {
LOG.error(e);
return false;
} catch (KeeperException e) {
LOG.debug("Won't lock " + znode + " because " + e.getMessage());
// TODO see if the other still exists!!
return false;
}
return true;
}
/**
* This methods copies all the hlogs queues from another region server
* and returns them all sorted per peer cluster (appended with the dead
* server's znode)
* @param znode server names to copy
* @return all hlogs for all peers of that cluster, null if an error occurred
*/
public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
// TODO this method isn't atomic enough, we could start copying and then
// TODO fail for some reason and we would end up with znodes we don't want.
SortedMap<String,SortedSet<String>> queues =
new TreeMap<String,SortedSet<String>>();
try {
String nodePath = this.zookeeperWrapper.getZNode(rsZNode, znode);
List<String> clusters = this.zookeeperWrapper.listZnodes(nodePath, null);
// We have a lock znode in there, it will count as one.
if (clusters == null || clusters.size() <= 1) {
return queues;
}
// The lock isn't a peer cluster, remove it
clusters.remove(RS_LOCK_ZNODE);
for (String cluster : clusters) {
// We add the name of the recovered RS to the new znode, we can even
// do that for queues that were recovered 10 times giving a znode like
// number-startcode-number-otherstartcode-number-anotherstartcode-etc
String newCluster = cluster+"-"+znode;
String newClusterZnode =
this.zookeeperWrapper.getZNode(rsServerNameZnode, newCluster);
this.zookeeperWrapper.ensureExists(newClusterZnode);
String clusterPath = this.zookeeperWrapper.getZNode(nodePath, cluster);
List<String> hlogs = this.zookeeperWrapper.listZnodes(clusterPath, null);
// That region server didn't have anything to replicate for this cluster
if (hlogs == null || hlogs.size() == 0) {
continue;
}
SortedSet<String> logQueue = new TreeSet<String>();
queues.put(newCluster, logQueue);
for (String hlog : hlogs) {
String position = Bytes.toString(
this.zookeeperWrapper.getData(clusterPath, hlog));
LOG.debug("Creating " + hlog + " with data " + position);
this.zookeeperWrapper.writeZNode(newClusterZnode, hlog, position);
logQueue.add(hlog);
}
}
} catch (InterruptedException e) {
LOG.warn(e);
return null;
} catch (KeeperException e) {
LOG.warn(e);
return null;
}
return queues;
}
/**
* Delete a complete queue of hlogs
* @param peerZnode znode of the peer cluster queue of hlogs to delete
*/
public void deleteSource(String peerZnode) {
try {
this.zookeeperWrapper.deleteZNode(
this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
LOG.error(e);
}
}
/**
* Recursive deletion of all znodes in specified rs' znode
* @param znode
*/
public void deleteRsQueues(String znode) {
try {
this.zookeeperWrapper.deleteZNode(
this.zookeeperWrapper.getZNode(rsZNode, znode), true);
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
LOG.error(e);
}
}
/**
* Delete this cluster's queues
*/
public void deleteOwnRSZNode() {
deleteRsQueues(this.rsServerNameZnode);
}
/**
* Get the position of the specified hlog in the specified peer znode
* @param peerId znode of the peer cluster
* @param hlog name of the hlog
* @return the position in that hlog
*/
public long getHLogRepPosition(String peerId, String hlog) {
String clusterZnode =
this.zookeeperWrapper.getZNode(rsServerNameZnode, peerId);
String data = Bytes.toString(
this.zookeeperWrapper.getData(clusterZnode, hlog));
return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
}
/**
* Tells if this cluster replicates or not
*
* @return if this is a master
*/
public boolean isReplicationMaster() {
return this.replicationMaster;
}
/**
* Get the identification of the cluster
*
* @return the id for the cluster
*/
public String getClusterId() {
return this.clusterId;
}
/**
* Get a map of all peer clusters
* @return map of peer cluster, zk address to ZKW
*/
public Map<String, ZooKeeperWrapper> getPeerClusters() {
return this.peerClusters;
}
/**
* Watcher for the status of the replication
*/
public class ReplicationStatusWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
if (type.equals(Event.EventType.NodeDataChanged)) {
setReplicating();
}
}
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
import org.apache.hadoop.hbase.master.TimeToLiveLogCleaner;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for
* replication before deleting it when its TTL is over.
*/
public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
private static final Log LOG =
LogFactory.getLog(ReplicationLogCleaner.class);
private TimeToLiveLogCleaner ttlCleaner;
private Configuration conf;
private ReplicationZookeeperWrapper zkHelper;
private Set<String> hlogs = new HashSet<String>();
/**
* Instantiates the cleaner, does nothing more.
*/
public ReplicationLogCleaner() {}
@Override
public boolean isLogDeletable(Path filePath) {
// Don't bother going further if the hlog isn't even expired
if (!ttlCleaner.isLogDeletable(filePath)) {
LOG.debug("Won't delete log since not past due " + filePath);
return false;
}
String log = filePath.getName();
// If we saw the hlog previously, let's consider it's still used
// At some point in the future we will refresh the list and it will be gone
if (this.hlogs.contains(log)) {
return false;
}
// Let's see it's still there
// This solution makes every miss very expensive to process since we
// almost completly refresh the cache each time
return !refreshHLogsAndSearch(log);
}
/**
* Search through all the hlogs we have in ZK to refresh the cache
* If a log is specified and found, then we early out and return true
* @param searchedLog log we are searching for, pass null to cache everything
* that's in zookeeper.
* @return false until a specified log is found.
*/
private boolean refreshHLogsAndSearch(String searchedLog) {
this.hlogs.clear();
final boolean lookForLog = searchedLog != null;
List<String> rss = zkHelper.getListOfReplicators(this);
if (rss == null) {
LOG.debug("Didn't find any region server that replicates, deleting: " +
searchedLog);
return false;
}
for (String rs: rss) {
List<String> listOfPeers = zkHelper.getListPeersForRS(rs, this);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this);
if (peersHlogs != null) {
this.hlogs.addAll(peersHlogs);
}
// early exit if we found the log
if(lookForLog && this.hlogs.contains(searchedLog)) {
LOG.debug("Found log in ZK, keeping: " + searchedLog);
return true;
}
}
}
LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
return false;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.ttlCleaner = new TimeToLiveLogCleaner();
this.ttlCleaner.setConf(conf);
try {
this.zkHelper = new ReplicationZookeeperWrapper(
ZooKeeperWrapper.createInstance(this.conf,
HMaster.class.getName()),
this.conf, new AtomicBoolean(true), null);
} catch (IOException e) {
LOG.error(e);
}
refreshHLogsAndSearch(null);
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void process(WatchedEvent watchedEvent) {}
}

View File

@ -0,0 +1,128 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
<!--
Copyright 2010 The Apache Software Foundation
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head />
<body bgcolor="white">
<h1>Multi Cluster Replication</h1>
This package provides replication between HBase clusters.
<p>
<h2>Table Of Contents</h2>
<ol>
<li><a href="#status">Status</a></li>
<li><a href="#requirements">Requirements</a></li>
<li><a href="#deployment">Deployment</a></li>
</ol>
<p>
<a name="status">
<h2>Status</h2>
</a>
<p>
This package is alpha quality software and is only meant to be a base
for future developments. The current implementation offers the following
features:
<ol>
<li>Master/Slave replication limited to 1 slave cluster. </li>
<li>Replication of scoped families in user tables.</li>
<li>Start/stop replication stream.</li>
<li>Supports clusters of different sizes.</li>
<li>Handling of partitions longer than 10 minutes</li>
</ol>
Please report bugs on the project's Jira when found.
<p>
<a name="requirements">
<h2>Requirements</h2>
</a>
<p>
Before trying out replication, make sure to review the following requirements:
<ol>
<li>Zookeeper should be handled by yourself, not by HBase, and should
always be available during the deployment.</li>
<li>All machines from both clusters should be able to reach every
other machine since replication goes from any region server to any
other one on the slave cluster. That also includes the
Zookeeper clusters.</li>
<li>Both clusters should have the same HBase and Hadoop major revision.
For example, having 0.21.1 on the master and 0.21.0 on the slave is
correct but not 0.21.1 and 0.22.0.</li>
<li>Every table that contains families that are scoped for replication
should exist on every cluster with the exact same name, same for those
replicated families.</li>
</ol>
<p>
<a name="deployment">
<h2>Deployment</h2>
</a>
<p>
The following steps describe how to enable replication from a cluster
to another. This must be done with both clusters offlined.
<ol>
<li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
the following configurations:
<pre>
&lt;property&gt;
&lt;name&gt;hbase.replication.enabled&lt;/name&gt;
&lt;value&gt;true&lt;/value&gt;
&lt;/property&gt;</pre>
</li>
<li>Run the following command on any cluster:
<pre>
$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/replication/bin/add_peer.tb</pre>
This will show you the help to setup the replication stream between
both clusters. If both clusters use the same Zookeeper cluster, you have
to use a different <b>zookeeper.znode.parent</b> since they can't
write in the same folder.
</li>
<li>You can now start and stop the clusters with your preferred method.</li>
</ol>
You can confirm that your setup works by looking at any region server's log
on the master cluster and look for the following lines;
<pre>
Considering 1 rs, with ratio 0.1
Getting 1 rs from peer cluster # 0
Choosing peer 10.10.1.49:62020</pre>
In this case it indicates that 1 region server from the slave cluster
was chosen for replication.<br><br>
Should you want to stop the replication while the clusters are running, open
the shell on the master cluster and issue this command:
<pre>
hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
Where you replace the znode parent with the one configured on your master
cluster. Replication of already queued edits will still happen after you
issued that command but new entries won't be. To start it back, simply replace
"false" with "true" in the command.
<p>
</body>
</html>

View File

@ -0,0 +1,162 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.LogEntryVisitor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Replication serves as an umbrella over the setup of replication and
* is used by HRS.
*/
public class Replication implements LogEntryVisitor {
private final boolean replication;
private final ReplicationSourceManager replicationManager;
private boolean replicationMaster;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private final ReplicationZookeeperWrapper zkHelper;
private final Configuration conf;
private final AtomicBoolean stopRequested;
private ReplicationSink replicationSink;
/**
* Instantiate the replication management (if rep is enabled).
* @param conf conf to use
* @param hsi the info if this region server
* @param fs handle to the filesystem
* @param oldLogDir directory where logs are archived
* @param stopRequested boolean that tells us if we are shutting down
* @throws IOException
*/
public Replication(Configuration conf, HServerInfo hsi,
FileSystem fs, Path oldLogDir,
AtomicBoolean stopRequested) throws IOException {
this.conf = conf;
this.stopRequested = stopRequested;
this.replication =
conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
if (replication) {
this.zkHelper = new ReplicationZookeeperWrapper(
ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
this.replicating, hsi.getServerName());
this.replicationMaster = zkHelper.isReplicationMaster();
this.replicationManager = this.replicationMaster ?
new ReplicationSourceManager(zkHelper, conf, stopRequested,
fs, this.replicating, oldLogDir) : null;
} else {
replicationManager = null;
zkHelper = null;
}
}
/**
* Join with the replication threads
*/
public void join() {
if (this.replication) {
if (this.replicationMaster) {
this.replicationManager.join();
}
this.zkHelper.deleteOwnRSZNode();
}
}
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
if (this.replication && !this.replicationMaster) {
this.replicationSink.replicateEntries(entries);
}
}
/**
* If replication is enabled and this cluster is a master,
* it starts
* @throws IOException
*/
public void startReplicationServices() throws IOException {
if (this.replication) {
if (this.replicationMaster) {
this.replicationManager.init();
} else {
this.replicationSink =
new ReplicationSink(this.conf, this.stopRequested);
}
}
}
/**
* Get the replication sources manager
* @return the manager if replication is enabled, else returns false
*/
public ReplicationSourceManager getReplicationManager() {
return replicationManager;
}
@Override
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
WALEdit logEdit) {
NavigableMap<byte[], Integer> scopes =
new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
for (KeyValue kv : logEdit.getKeyValues()) {
family = kv.getFamily();
int scope = info.getTableDesc().getFamily(family).getScope();
if (scope != HConstants.REPLICATION_SCOPE_LOCAL &&
!scopes.containsKey(family)) {
scopes.put(family, scope);
}
}
if (!scopes.isEmpty()) {
logEdit.setScopes(scopes);
}
}
/**
* Add this class as a log entry visitor for HLog if replication is enabled
* @param hlog log that was add ourselves on
*/
public void addLogEntryVisitor(HLog hlog) {
if (replication) {
hlog.addLogEntryVisitor(this);
}
}
}

View File

@ -0,0 +1,203 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class is responsible for replicating the edits coming
* from another cluster.
* <p/>
* This replication process is currently waiting for the edits to be applied
* before the method can return. This means that the replication of edits
* is synchronized (after reading from HLogs in ReplicationSource) and that a
* single region server cannot receive edits from two sources at the same time
* <p/>
* This class uses the native HBase client in order to replicate entries.
* <p/>
*
* TODO make this class more like ReplicationSource wrt log handling
*/
public class ReplicationSink {
private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
// Name of the HDFS directory that contains the temporary rep logs
public static final String REPLICATION_LOG_DIR = ".replogs";
private final Configuration conf;
// Pool used to replicated
private final HTablePool pool;
// boolean coming from HRS to know when the process stops
private final AtomicBoolean stop;
/**
* Create a sink for replication
*
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf, AtomicBoolean stopper)
throws IOException {
this.conf = conf;
this.pool = new HTablePool(this.conf,
conf.getInt("replication.sink.htablepool.capacity", 10));
this.stop = stopper;
}
/**
* Replicate this array of entries directly into the local cluster
* using the native client.
*
* @param entries
* @throws IOException
*/
public synchronized void replicateEntries(HLog.Entry[] entries)
throws IOException {
// Very simple optimization where we batch sequences of rows going
// to the same table.
try {
long totalReplicated = 0;
byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
List<Put> puts = new ArrayList<Put>();
for (HLog.Entry entry : entries) {
WALEdit edit = entry.getEdit();
List<KeyValue> kvs = edit.getKeyValues();
if (kvs.get(0).isDelete()) {
Delete delete = new Delete(kvs.get(0).getRow(),
kvs.get(0).getTimestamp(), null);
for (KeyValue kv : kvs) {
if (kv.isDeleteFamily()) {
delete.deleteFamily(kv.getFamily());
} else if (!kv.isEmptyColumn()) {
delete.deleteColumn(kv.getFamily(),
kv.getQualifier());
}
}
delete(entry.getKey().getTablename(), delete);
} else {
// Switching table, flush
if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
put(lastTable, puts);
}
// With mini-batching, we need to expect multiple rows per edit
byte[] lastKey = kvs.get(0).getRow();
Put put = new Put(kvs.get(0).getRow(),
kvs.get(0).getTimestamp());
for (KeyValue kv : kvs) {
if (!Bytes.equals(lastKey, kv.getRow())) {
puts.add(put);
put = new Put(kv.getRow(), kv.getTimestamp());
}
put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
lastKey = kv.getRow();
}
puts.add(put);
lastTable = entry.getKey().getTablename();
}
totalReplicated++;
}
put(lastTable, puts);
LOG.info("Total replicated: " + totalReplicated);
} catch (IOException ex) {
if (ex.getCause() instanceof TableNotFoundException) {
LOG.warn("Losing edits because: ", ex);
} else {
// Should we log rejected edits in a file for replay?
LOG.error("Unable to accept edit because", ex);
this.stop.set(true);
throw ex;
}
} catch (RuntimeException re) {
if (re.getCause() instanceof TableNotFoundException) {
LOG.warn("Losing edits because: ", re);
} else {
this.stop.set(true);
throw re;
}
}
}
/**
* Do the puts and handle the pool
* @param tableName table to insert into
* @param puts list of puts
* @throws IOException
*/
private void put(byte[] tableName, List<Put> puts) throws IOException {
if (puts.isEmpty()) {
return;
}
HTableInterface table = null;
try {
table = this.pool.getTable(tableName);
table.put(puts);
this.pool.putTable(table);
puts.clear();
} finally {
if (table != null) {
this.pool.putTable(table);
}
}
}
/**
* Do the delete and handle the pool
* @param tableName table to delete in
* @param delete the delete to use
* @throws IOException
*/
private void delete(byte[] tableName, Delete delete) throws IOException {
HTableInterface table = null;
try {
table = this.pool.getTable(tableName);
table.delete(delete);
this.pool.putTable(table);
} finally {
if (table != null) {
this.pool.putTable(table);
}
}
}
}

View File

@ -0,0 +1,647 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
* <p/>
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
* <p/>
*
*/
public class ReplicationSource extends Thread
implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Lock to manage when a HLog is changing path (archiving)
private final ReentrantLock pathLock = new ReentrantLock();
// Queue of logs to process
private PriorityBlockingQueue<Path> queue;
// container of entries to replicate
private HLog.Entry[] entriesArray;
private HConnection conn;
// Helper class for zookeeper
private ReplicationZookeeperWrapper zkHelper;
private Configuration conf;
// ratio of region servers to chose from a slave cluster
private float ratio;
private Random random;
// should we replicate or not?
private AtomicBoolean replicating;
// id of the peer cluster this source replicates to
private String peerClusterId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
private AtomicBoolean stop;
// List of chosen sinks (region servers)
private List<HServerAddress> currentPeers;
// How long should we sleep for each retry
private long sleepForRetries;
// Max size in bytes of entriesArray
private long replicationQueueSizeCapacity;
// Max number of entries in entriesArray
private int replicationQueueNbCapacity;
// Our reader for the current log
private HLog.Reader reader;
// Current position in the log
private long position = 0;
// Path of the current log
private volatile Path currentPath;
private FileSystem fs;
// id of this cluster
private byte clusterId;
// total number of edits we replicated
private long totalReplicatedEdits = 0;
// The znode we currently play with
private String peerClusterZnode;
// Indicates if this queue is recovered (and will be deleted when depleted)
private boolean queueRecovered;
// Maximum number of retries before taking bold actions
private long maxRetriesMultiplier;
// Current number of entries that we need to replicate
private int currentNbEntries = 0;
// Indicates if this particular source is running
private volatile boolean running = true;
/**
* Instantiation method used by region servers
*
* @param conf configuration to use
* @param fs file system to use
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param replicating the atomic boolean that starts/stops replication
* @param peerClusterZnode the name of our znode
* @throws IOException
*/
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
final AtomicBoolean stopper,
final AtomicBoolean replicating,
final String peerClusterZnode)
throws IOException {
this.stop = stopper;
this.conf = conf;
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000);
this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
this.entriesArray[i] = new HLog.Entry();
}
this.maxRetriesMultiplier =
this.conf.getLong("replication.source.maxretriesmultiplier", 10);
this.queue =
new PriorityBlockingQueue<Path>(
conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
this.conn = HConnectionManager.getConnection(conf);
this.zkHelper = manager.getRepZkWrapper();
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList<HServerAddress>();
this.random = new Random();
this.replicating = replicating;
this.manager = manager;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.clusterId = Byte.valueOf(zkHelper.getClusterId());
// Finally look if this is a recovered queue
this.checkIfQueueRecovered(peerClusterZnode);
}
// The passed znode will be either the id of the peer cluster or
// the handling story of that queue in the form of id-startcode-*
private void checkIfQueueRecovered(String peerClusterZnode) {
String[] parts = peerClusterZnode.split("-");
this.queueRecovered = parts.length != 1;
this.peerClusterId = this.queueRecovered ?
parts[0] : peerClusterZnode;
this.peerClusterZnode = peerClusterZnode;
}
/**
* Select a number of peers at random using the ratio. Mininum 1.
*/
private void chooseSinks() {
this.currentPeers.clear();
List<HServerAddress> addresses =
this.zkHelper.getPeersAddresses(peerClusterId);
Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
" rs from peer cluster # " + peerClusterId);
for (int i = 0; i < nbPeers; i++) {
HServerAddress address;
// Make sure we get one address that we don't already have
do {
address = addresses.get(this.random.nextInt(addresses.size()));
} while (setOfAddr.contains(address));
LOG.info("Choosing peer " + address);
setOfAddr.add(address);
}
this.currentPeers.addAll(setOfAddr);
}
@Override
public void enqueueLog(Path log) {
this.queue.put(log);
}
@Override
public void logArchived(Path oldPath, Path newPath) {
// in sync with queue polling
this.pathLock.lock();
try {
if (oldPath.equals(this.currentPath)) {
this.currentPath = newPath;
LOG.debug("Current log moved, changing currentPath to " +newPath);
return;
}
boolean present = this.queue.remove(oldPath);
LOG.debug("old log was " + (present ?
"present, changing the queue" : "already processed"));
if (present) {
this.queue.add(newPath);
}
} finally {
this.pathLock.unlock();
}
}
@Override
public void run() {
connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (this.stop.get()) {
return;
}
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
this.position = this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName());
}
int sleepMultiplier = 1;
// Loop until we close down
while (!stop.get() && this.running) {
// In sync with logArchived
this.pathLock.lock();
try {
// Get a new path
if (!getNextPath()) {
if (sleepForRetries("No log to process", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
// Open a reader on it
if (!openReader(sleepMultiplier)) {
continue;
}
} finally {
this.pathLock.unlock();
}
// If we got a null reader but didn't continue, then sleep and continue
if (this.reader == null) {
if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
boolean gotIOE = false; // TODO this is a hack for HDFS-1057
currentNbEntries = 0;
try {
if(readAllEntriesToReplicateOrNextFile()) {
continue;
}
} catch (IOException ioe) {
LOG.warn(peerClusterZnode + " Got: ", ioe);
gotIOE = true;
if (ioe.getCause() instanceof EOFException) {
boolean considerDumping = false;
if (this.queueRecovered) {
try {
FileStatus stat = this.fs.getFileStatus(this.currentPath);
if (stat.getLen() == 0) {
LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
}
considerDumping = true;
} catch (IOException e) {
LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
}
} else if (currentNbEntries != 0) {
LOG.warn(peerClusterZnode + " Got EOF while reading, " +
"looks like this file is broken? " + currentPath);
considerDumping = true;
currentNbEntries = 0;
}
if (considerDumping &&
sleepMultiplier == this.maxRetriesMultiplier &&
processEndOfFile()) {
continue;
}
}
} finally {
try {
// if current path is null, it means we processEndOfFile hence
if (this.currentPath != null && !gotIOE) {
this.position = this.reader.getPosition();
}
if (this.reader != null) {
this.reader.close();
}
} catch (IOException e) {
gotIOE = true;
LOG.warn("Unable to finalize the tailing of a file", e);
}
}
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (!stop.get() && (gotIOE || currentNbEntries == 0)) {
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
sleepMultiplier = 1;
shipEdits();
}
LOG.debug("Source exiting " + peerClusterId);
}
/**
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
long seenEntries = 0;
if (this.position != 0) {
this.reader.seek(this.position);
}
HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
while (entry != null) {
WALEdit edit = entry.getEdit();
seenEntries++;
// Remove all KVs that should not be replicated
removeNonReplicableEdits(edit);
HLogKey logKey = entry.getKey();
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
edit.size() != 0 && replicating.get()) {
logKey.setClusterId(this.clusterId);
currentNbEntries++;
}
// Stop if too many entries or too big
if ((this.reader.getPosition() - this.position)
>= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) {
break;
}
entry = this.reader.next(entriesArray[currentNbEntries]);
}
LOG.debug("currentNbEntries:" + currentNbEntries +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - this.position));
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
}
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
while (!this.stop.get() && this.currentPeers.size() == 0) {
try {
chooseSinks();
Thread.sleep(this.sleepForRetries);
} catch (InterruptedException e) {
LOG.error("Interrupted while trying to connect to sinks", e);
}
}
}
/**
* Poll for the next path
* @return true if a path was obtained, false if not
*/
protected boolean getNextPath() {
try {
if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
}
return this.currentPath != null;
}
/**
* Open a reader on the current path
*
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return true if we should continue with that file, false if we are over with it
*/
protected boolean openReader(int sleepMultiplier) {
try {
LOG.info("Opening log for replication " + this.currentPath.getName() + " at " + this.position);
this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
} catch (IOException ioe) {
this.reader = null;
LOG.warn(peerClusterZnode + " Got: ", ioe);
// TODO Need a better way to determinate if a file is really gone but
// TODO without scanning all logs dir
if (sleepMultiplier == this.maxRetriesMultiplier) {
LOG.warn("Waited too long for this file, considering dumping");
return !processEndOfFile();
}
}
return true;
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
}
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* We only want KVs that are scoped other than local
* @param edit The KV to check for replication
*/
protected void removeNonReplicableEdits(WALEdit edit) {
NavigableMap<byte[], Integer> scopes = edit.getScopes();
List<KeyValue> kvs = edit.getKeyValues();
for (int i = 0; i < edit.size(); i++) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
if (scopes == null || !scopes.containsKey(kv.getFamily())) {
kvs.remove(i);
i--;
}
}
}
/**
* Do the shipping logic
*/
protected void shipEdits() {
int sleepMultiplier = 1;
while (!stop.get()) {
try {
HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
this.totalReplicatedEdits += currentNbEntries;
LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
break;
} catch (IOException ioe) {
LOG.warn("Unable to replicate because ", ioe);
try {
boolean down;
do {
down = isSlaveDown();
if (down) {
LOG.debug("The region server we tried to ping didn't answer, " +
"sleeping " + sleepForRetries + " times " + sleepMultiplier);
Thread.sleep(this.sleepForRetries * sleepMultiplier);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
} else {
chooseSinks();
}
}
} while (!stop.get() && down);
} catch (InterruptedException e) {
LOG.debug("Interrupted while trying to contact the peer cluster");
}
}
}
}
/**
* If the queue isn't empty, switch to the next one
* Else if this is a recovered queue, it means we're done!
* Else we'll just continue to try reading the log file
* @return true if we're done with the current file, false if we should
* continue trying to read from it
*/
protected boolean processEndOfFile() {
this.pathLock.lock();
try {
if (this.queue.size() != 0) {
this.currentPath = null;
this.position = 0;
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
this.abort();
return true;
}
} finally {
this.pathLock.unlock();
}
return false;
}
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
LOG.fatal("Set stop flag in " + t.getName(), e);
abort();
}
};
Threads.setDaemonThreadRunning(
this, n + ".replicationSource," + clusterId, handler);
}
/**
* Hastily stop the replication, then wait for shutdown
*/
private void abort() {
LOG.info("abort");
this.running = false;
terminate();
}
public void terminate() {
LOG.info("terminate");
Threads.shutdown(this, this.sleepForRetries);
}
/**
* Get a new region server at random from this peer
* @return
* @throws IOException
*/
private HRegionInterface getRS() throws IOException {
if (this.currentPeers.size() == 0) {
throw new IOException(this.peerClusterZnode + " has 0 region servers");
}
HServerAddress address =
currentPeers.get(random.nextInt(this.currentPeers.size()));
return this.conn.getHRegionConnection(address);
}
/**
* Check if the slave is down by trying to establish a connection
* @return true if down, false if up
* @throws InterruptedException
*/
public boolean isSlaveDown() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Thread pingThread = new Thread() {
public void run() {
try {
HRegionInterface rrs = getRS();
// Dummy call which should fail
rrs.getHServerInfo();
latch.countDown();
} catch (IOException ex) {
LOG.info("Slave cluster looks down: " + ex.getMessage());
}
}
};
pingThread.start();
// awaits returns true if countDown happened
boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
pingThread.interrupt();
return down;
}
/**
* Get the id that the source is replicating to
*
* @return peer cluster id
*/
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
/**
* Get the path of the current HLog
* @return current hlog's path
*/
public Path getCurrentPath() {
return this.currentPath;
}
/**
* Comparator used to compare logs together based on their start time
*/
public static class LogsComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
}
@Override
public boolean equals(Object o) {
return true;
}
/**
* Split a path to get the start time
* For example: 10.20.20.171%3A60020.1277499063250
* @param p path to split
* @return start time
*/
private long getTS(Path p) {
String[] parts = p.getName().split("\\.");
return Long.parseLong(parts[parts.length-1]);
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Interface that defines a replication source
*/
public interface ReplicationSourceInterface {
/**
* Initializer for the source
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster
* @throws IOException
*/
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
final AtomicBoolean stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException;
/**
* Add a log to the list of logs to replicate
* @param log path to the log to replicate
*/
public void enqueueLog(Path log);
/**
* Get the current log that's replicated
* @return the current log
*/
public Path getCurrentPath();
/**
* Start the replication
*/
public void startup();
/**
* End the replication
*/
public void terminate();
/**
* Get the id that the source is replicating to
*
* @return peer cluster id
*/
public String getPeerClusterZnode();
/**
* Notify this source that a log was archived
* @param oldPath old path of the log
* @param newPath new path of the log (archive)
*/
public void logArchived(Path oldPath, Path newPath);
}

View File

@ -0,0 +1,353 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
* <li> Normal sources are persistent and one per peer cluster</li>
* <li> Old sources are recovered from a failed region server and our
* only goal is to finish replicating the HLog queue it had up in ZK</li>
*
* When a region server dies, this class uses a watcher to get notified and it
* tries to grab a lock in order to transfer all the queues in a local
* old source.
*/
public class ReplicationSourceManager implements LogActionsListener {
private static final Log LOG =
LogFactory.getLog(ReplicationSourceManager.class);
// List of all the sources that read this RS's logs
private final List<ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
// Indicates if we are currently replicating
private final AtomicBoolean replicating;
// Helper for zookeeper
private final ReplicationZookeeperWrapper zkHelper;
// Indicates if the region server is closing
private final AtomicBoolean stopper;
// All logs we are currently trackign
private final SortedSet<String> hlogs;
private final Configuration conf;
private final FileSystem fs;
// The path to the latest log we saw, for new coming sources
private Path latestPath;
// List of all the other region servers in this cluster
private final List<String> otherRegionServers;
// Path to the hlog archive
private final Path oldLogDir;
/**
* Creates a replication manager and sets the watch on all the other
* registered region servers
* @param zkHelper the zk helper for replication
* @param conf the configuration to use
* @param stopper the stopper object for this region server
* @param fs the file system to use
* @param replicating the status of the replication on this cluster
* @param oldLogDir the directory where old logs are archived
*/
public ReplicationSourceManager(final ReplicationZookeeperWrapper zkHelper,
final Configuration conf,
final AtomicBoolean stopper,
final FileSystem fs,
final AtomicBoolean replicating,
final Path oldLogDir) {
this.sources = new ArrayList<ReplicationSourceInterface>();
this.replicating = replicating;
this.zkHelper = zkHelper;
this.stopper = stopper;
this.hlogs = new TreeSet<String>();
this.oldsources = new ArrayList<ReplicationSourceInterface>();
this.conf = conf;
this.fs = fs;
this.oldLogDir = oldLogDir;
List<String> otherRSs =
this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
}
/**
* Provide the id of the peer and a log key and this method will figure which
* hlog it belongs to and will log, for this region server, the current
* position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from
* replication status in zookeeper. It will also delete older entries.
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
*/
public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key.toString(), id, position);
synchronized (this.hlogs) {
if (!queueRecovered && this.hlogs.first() != key) {
SortedSet<String> hlogSet = this.hlogs.headSet(key);
LOG.info("Removing " + hlogSet.size() +
" logs in the list: " + hlogSet);
for (String hlog : hlogSet) {
this.zkHelper.removeLogFromList(hlog.toString(), id);
}
hlogSet.clear();
}
}
}
/**
* Adds a normal source per registered peer cluster and tries to process all
* old region server hlog queues
*/
public void init() throws IOException {
for (String id : this.zkHelper.getPeerClusters().keySet()) {
ReplicationSourceInterface src = addSource(id);
src.startup();
}
List<String> currentReplicators = this.zkHelper.getListOfReplicators(null);
synchronized (otherRegionServers) {
LOG.info("Current list of replicators: " + currentReplicators
+ " other RSs: " + otherRegionServers);
}
// Look if there's anything to process after a restart
for (String rs : currentReplicators) {
synchronized (otherRegionServers) {
if (!this.otherRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
}
/**
* Add a new normal source to this region server
* @param id the id of the peer cluster
* @return the created source
* @throws IOException
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
this.sources.add(src);
synchronized (this.hlogs) {
if (this.hlogs.size() > 0) {
this.zkHelper.addLogToList(this.hlogs.first(),
this.sources.get(0).getPeerClusterZnode());
src.enqueueLog(this.latestPath);
}
}
return src;
}
/**
* Terminate the replication on this region server
*/
public void join() {
if (this.sources.size() == 0) {
this.zkHelper.deleteOwnRSZNode();
}
for (ReplicationSourceInterface source : this.sources) {
source.terminate();
}
}
/**
* Get a copy of the hlogs of the first source on this rs
* @return a sorted set of hlog names
*/
protected SortedSet<String> getHLogs() {
return new TreeSet(this.hlogs);
}
/**
* Get a list of all the normal sources of this rs
* @return lis of all sources
*/
public List<ReplicationSourceInterface> getSources() {
return this.sources;
}
@Override
public void logRolled(Path newLog) {
if (this.sources.size() > 0) {
this.zkHelper.addLogToList(newLog.getName(),
this.sources.get(0).getPeerClusterZnode());
}
synchronized (this.hlogs) {
this.hlogs.add(newLog.getName());
}
this.latestPath = newLog;
for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog);
}
}
@Override
public void logArchived(Path oldPath, Path newPath) {
for (ReplicationSourceInterface source : this.sources) {
source.logArchived(oldPath, newPath);
}
}
/**
* Get the ZK help of this manager
* @return the helper
*/
public ReplicationZookeeperWrapper getRepZkWrapper() {
return zkHelper;
}
/**
* Factory method to create a replication source
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster
* @return the created source
* @throws IOException
*/
public ReplicationSourceInterface getReplicationSource(
final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
final AtomicBoolean stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException {
ReplicationSourceInterface src;
try {
Class c = Class.forName(conf.get("replication.replicationsource.implementation",
ReplicationSource.class.getCanonicalName()));
src = (ReplicationSourceInterface) c.newInstance();
} catch (Exception e) {
LOG.warn("Passed replication source implemention throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
}
src.init(conf, fs, manager, stopper, replicating, peerClusterId);
return src;
}
/**
* Transfer all the queues of the specified to this region server.
* First it tries to grab a lock and if it works it will move the
* znodes and finally will delete the old znodes.
*
* It creates one old source for any type of source of the old rs.
* @param rsZnode
*/
public void transferQueues(String rsZnode) {
// We try to lock that rs' queue directory
if (this.stopper.get()) {
LOG.info("Not transferring queue since we are shutting down");
return;
}
if (!this.zkHelper.lockOtherRS(rsZnode)) {
return;
}
LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
SortedMap<String, SortedSet<String>> newQueues =
this.zkHelper.copyQueuesFromRS(rsZnode);
if (newQueues == null || newQueues.size() == 0) {
return;
}
this.zkHelper.deleteRsQueues(rsZnode);
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
try {
ReplicationSourceInterface src = getReplicationSource(this.conf,
this.fs, this, this.stopper, this.replicating, peerId);
this.oldsources.add(src);
for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(this.oldLogDir, hlog));
}
src.startup();
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
}
}
}
/**
* Clear the references to the specified old source
* @param src source to clear
*/
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src);
this.zkHelper.deleteSource(src.getPeerClusterZnode());
}
/**
* Watcher used to be notified of the other region server's death
* in the local cluster. It initiates the process to transfer the queues
* if it is able to grab the lock.
*/
public class OtherRegionServerWatcher implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
LOG.info(" event " + watchedEvent);
if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
return;
}
List<String> newRsList = (zkHelper.getRegisteredRegionServers(this));
if (newRsList == null) {
return;
} else {
synchronized (otherRegionServers) {
otherRegionServers.clear();
otherRegionServers.addAll(newRsList);
}
}
if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
String[] rsZnodeParts = watchedEvent.getPath().split("/");
transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
}
}
}
}

View File

@ -467,6 +467,7 @@ public class ZooKeeperWrapper implements Watcher {
private HServerAddress readAddress(String znode, Watcher watcher) {
try {
LOG.debug("<" + instanceName + ">" + "Trying to read " + znode);
return readAddressOrThrow(znode, watcher);
} catch (IOException e) {
LOG.debug("<" + instanceName + ">" + "Failed to read " + e.getMessage());
@ -572,7 +573,7 @@ public class ZooKeeperWrapper implements Watcher {
if (recursive) {
LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode);
List<String> znodes = this.zooKeeper.getChildren(znode, false);
if (znodes.size() > 0) {
if (znodes != null && znodes.size() > 0) {
for (String child : znodes) {
String childFullPath = getZNode(znode, child);
LOG.info("<" + instanceName + ">" + "deleteZNode recursive call " + childFullPath);
@ -914,10 +915,10 @@ public class ZooKeeperWrapper implements Watcher {
if (failOnWrite || stat == null) {
this.zooKeeper.create(path, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.debug("<" + instanceName + ">" + "Created " + path);
LOG.debug("<" + instanceName + ">" + "Created " + path + " with data " + strData);
} else {
this.zooKeeper.setData(path, data, -1);
LOG.debug("<" + instanceName + ">" + "Updated " + path);
LOG.debug("<" + instanceName + ">" + "Updated " + path + " with data " + strData);
}
}

View File

@ -466,6 +466,7 @@ public class HBaseTestingUtility {
* @throws IOException
*/
public int loadTable(final HTable t, final byte[] f) throws IOException {
t.setAutoFlush(false);
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
@ -481,6 +482,7 @@ public class HBaseTestingUtility {
}
}
}
t.flushCommits();
return rowCount;
}

View File

@ -525,4 +525,47 @@ public class TestHLog extends HBaseTestCase {
}
}
}
/**
* Test that we can visit entries before they are appended
* @throws Exception
*/
public void testVisitors() throws Exception {
final int COL_COUNT = 10;
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
this.conf.setBoolean("dfs.support.append", true);
HLog log = new HLog(this.fs, dir, this.oldLogDir, this.conf, null);
DumbLogEntriesVisitor visitor = new DumbLogEntriesVisitor();
log.addLogEntryVisitor(visitor);
long timestamp = System.currentTimeMillis();
HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
for (int i = 0; i < COL_COUNT; i++) {
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[]{(byte) (i + '0')}));
log.append(hri, tableName, cols, System.currentTimeMillis());
}
assertEquals(COL_COUNT, visitor.increments);
log.removeLogEntryVisitor(visitor);
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)),
timestamp, new byte[]{(byte) (11 + '0')}));
log.append(hri, tableName, cols, System.currentTimeMillis());
assertEquals(COL_COUNT, visitor.increments);
}
static class DumbLogEntriesVisitor implements LogEntryVisitor {
int increments = 0;
@Override
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
WALEdit logEdit) {
increments++;
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
*/
public class ReplicationSourceDummy implements ReplicationSourceInterface {
ReplicationSourceManager manager;
String peerClusterId;
Path currentPath;
@Override
public void init(Configuration conf, FileSystem fs,
ReplicationSourceManager manager, AtomicBoolean stopper,
AtomicBoolean replicating, String peerClusterId)
throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
}
@Override
public void enqueueLog(Path log) {
this.currentPath = log;
}
@Override
public Path getCurrentPath() {
return this.currentPath;
}
@Override
public void startup() {
}
@Override
public void terminate() {
}
@Override
public String getPeerClusterZnode() {
return peerClusterId;
}
@Override
public void logArchived(Path oldPath, Path newPath) {
}
}

View File

@ -0,0 +1,478 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.EmptyWatcher;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestReplication {
private static final Log LOG = LogFactory.getLog(TestReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static ZooKeeperWrapper zkw1;
private static ZooKeeperWrapper zkw2;
private static HTable htable1;
private static HTable htable2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static final int NB_ROWS_IN_BATCH = 100;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 10;
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] noRepfamName = Bytes.toBytes("norep");
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
conf1.setInt("replication.source.nb.capacity", 5);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setLong("hbase.client.retries.number", 4);
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
zkw1.writeZNode("/1", "replication", "");
zkw1.writeZNode("/1/replication", "master",
conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
setIsReplication(true);
LOG.info("Setup first Zk");
conf2 = HBaseConfiguration.create();
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt("hbase.client.retries.number", 6);
conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf2.setBoolean("dfs.support.append", true);
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
zkw2.writeZNode("/2", "replication", "");
zkw2.writeZNode("/2/replication", "master",
conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
zkw1.writeZNode("/1/replication/peers", "1",
conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2");
LOG.info("Setup second Zk");
utility1.startMiniCluster(2);
utility2.startMiniCluster(2);
HTableDescriptor table = new HTableDescriptor(tableName);
table.setDeferredLogFlush(false);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
HBaseAdmin admin1 = new HBaseAdmin(conf1);
HBaseAdmin admin2 = new HBaseAdmin(conf2);
admin1.createTable(table);
admin2.createTable(table);
htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024*5);
htable2 = new HTable(conf2, tableName);
}
private static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
// Takes some ms for ZK to fire the watcher
Thread.sleep(SLEEP_TIME);
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
setIsReplication(false);
utility1.truncateTable(tableName);
utility2.truncateTable(tableName);
// If test is flaky, set that sleep higher
Thread.sleep(SLEEP_TIME*8);
setIsReplication(true);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
/**
* Add a row, check it's replicated, delete it, check's gone
* @throws Exception
*/
@Test
public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete");
Put put = new Put(row);
put.add(famName, row, row);
htable1 = new HTable(conf1, tableName);
htable1.put(put);
HTable table2 = new HTable(conf2, tableName);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = table2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
Delete del = new Delete(row);
htable1.delete(del);
table2 = new HTable(conf2, tableName);
get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
Result res = table2.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
}
/**
* Try a small batch upload using the write buffer, check it's replicated
* @throws Exception
*/
@Test
public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch");
Put put;
// normal Batch tests
htable1.setAutoFlush(false);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
htable1.put(put);
}
htable1.flushCommits();
Scan scan = new Scan();
ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close();
assertEquals(NB_ROWS_IN_BATCH, res1.length);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for normal batch replication");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH);
scanner.close();
if (res.length != NB_ROWS_IN_BATCH) {
LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
htable1.setAutoFlush(true);
}
/**
* Test stopping replication, trying to insert, make sure nothing's
* replicated, enable it, try replicating and it should work
* @throws Exception
*/
@Test
public void testStartStop() throws Exception {
// Test stopping replication
setIsReplication(false);
Put put = new Put(Bytes.toBytes("stop start"));
put.add(famName, row, row);
htable1.put(put);
Get get = new Get(Bytes.toBytes("stop start"));
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
break;
}
Result res = htable2.get(get);
if(res.size() >= 1) {
fail("Replication wasn't stopped");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
// Test restart replication
setIsReplication(true);
htable1.put(put);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = htable2.get(get);
if(res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
put = new Put(Bytes.toBytes("do not rep"));
put.add(noRepfamName, row, row);
htable1.put(put);
get = new Get(Bytes.toBytes("do not rep"));
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES-1) {
break;
}
Result res = htable2.get(get);
if (res.size() >= 1) {
fail("Not supposed to be replicated");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
}
/**
* Do a more intense version testSmallBatch, one that will trigger
* hlog rolling and other non-trivial code paths
* @throws Exception
*/
@Test
public void loadTesting() throws Exception {
htable1.setWriteBufferSize(1024);
htable1.setAutoFlush(false);
for (int i = 0; i < NB_ROWS_IN_BATCH *10; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
htable1.put(put);
}
htable1.flushCommits();
Scan scan = new Scan();
ResultScanner scanner = htable1.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH * 100);
scanner.close();
assertEquals(NB_ROWS_IN_BATCH *10, res.length);
scan = new Scan();
for (int i = 0; i < NB_RETRIES; i++) {
scanner = htable2.getScanner(scan);
res = scanner.next(NB_ROWS_IN_BATCH * 100);
scanner.close();
if (res.length != NB_ROWS_IN_BATCH *10) {
if (i == NB_RETRIES-1) {
int lastRow = -1;
for (Result result : res) {
int currentRow = Bytes.toInt(result.getRow());
for (int row = lastRow+1; row < currentRow; row++) {
LOG.error("Row missing: " + row);
}
lastRow = currentRow;
}
LOG.error("Last row: " + lastRow);
fail("Waited too much time for normal batch replication, "
+ res.length + " instead of " + NB_ROWS_IN_BATCH *10);
} else {
LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
}
} else {
break;
}
}
}
/**
* Load up multiple tables over 2 region servers and kill a source during
* the upload. The failover happens internally.
* @throws Exception
*/
@Test
public void queueFailover() throws Exception {
utility1.createMultiRegions(htable1, famName);
// killing the RS with .META. can result into failed puts until we solve
// IO fencing
int rsToKill1 =
utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
int rsToKill2 =
utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
// Takes about 20 secs to run the full loading, kill around the middle
Thread killer1 = killARegionServer(utility1, 7500, rsToKill1);
Thread killer2 = killARegionServer(utility2, 10000, rsToKill2);
LOG.info("Start loading table");
int initialCount = utility1.loadTable(htable1, famName);
LOG.info("Done loading table");
killer1.join(5000);
killer2.join(5000);
LOG.info("Done waiting for threads");
Result[] res;
while (true) {
try {
Scan scan = new Scan();
ResultScanner scanner = htable1.getScanner(scan);
res = scanner.next(initialCount);
scanner.close();
break;
} catch (UnknownScannerException ex) {
LOG.info("Cluster wasn't ready yet, restarting scanner");
}
}
// Test we actually have all the rows, we may miss some because we
// don't have IO fencing.
if (res.length != initialCount) {
LOG.warn("We lost some rows on the master cluster!");
// We don't really expect the other cluster to have more rows
initialCount = res.length;
}
Scan scan2 = new Scan();
int lastCount = 0;
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for queueFailover replication");
}
ResultScanner scanner2 = htable2.getScanner(scan2);
Result[] res2 = scanner2.next(initialCount * 2);
scanner2.close();
if (res2.length < initialCount) {
if (lastCount < res2.length) {
i--; // Don't increment timeout if we make progress
}
lastCount = res2.length;
LOG.info("Only got " + lastCount + " rows instead of " +
initialCount + " current i=" + i);
Thread.sleep(SLEEP_TIME*2);
} else {
break;
}
}
}
private static Thread killARegionServer(final HBaseTestingUtility utility,
final long timeout, final int rs) {
Thread killer = new Thread() {
public void run() {
try {
Thread.sleep(timeout);
utility.expireRegionServerSession(rs);
} catch (Exception e) {
LOG.error(e);
}
}
};
killer.start();
return killer;
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TestReplicationSource {
private static final Log LOG =
LogFactory.getLog(TestReplicationSource.class);
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static FileSystem fs;
private static Path oldLogDir;
private static Path logDir;
private static Configuration conf = HBaseConfiguration.create();
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniDFSCluster(1);
fs = TEST_UTIL.getDFSCluster().getFileSystem();
oldLogDir = new Path(fs.getHomeDirectory(),
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(fs.getHomeDirectory(),
HConstants.HREGION_LOGDIR_NAME);
}
/**
* Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard
* time reading logs that are being archived.
* @throws Exception
*/
@Test
public void testLogMoving() throws Exception{
Path logPath = new Path(logDir, "log");
HLog.Writer writer = HLog.createWriter(fs, logPath, conf);
for(int i = 0; i < 3; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
WALEdit edit = new WALEdit();
edit.add(kv);
HLogKey key = new HLogKey(b, b, 0, 0);
writer.append(new HLog.Entry(key, edit));
writer.sync();
}
writer.close();
HLog.Reader reader = HLog.getReader(fs, logPath, conf);
HLog.Entry entry = reader.next();
assertNotNull(entry);
Path oldLogPath = new Path(oldLogDir, "log");
fs.rename(logPath, oldLogPath);
entry = reader.next();
assertNotNull(entry);
entry = reader.next();
entry = reader.next();
assertNull(entry);
}
}

View File

@ -0,0 +1,248 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestReplicationSink {
private static final Log LOG =
LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
private static final long SLEEP_TIME = 500;
private final static Configuration conf = HBaseConfiguration.create();
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static ReplicationSink SINK;
private static final byte[] TABLE_NAME1 =
Bytes.toBytes("table1");
private static final byte[] TABLE_NAME2 =
Bytes.toBytes("table2");
private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
private static HTable table1;
private static HTable table2;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
TEST_UTIL.getConfiguration().setBoolean(
HConstants.REPLICATION_ENABLE_KEY, true);
TEST_UTIL.startMiniCluster(3);
conf.setBoolean("dfs.support.append", true);
SINK = new ReplicationSink(conf,STOPPER);
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
STOPPER.set(true);
TEST_UTIL.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
Thread.sleep(SLEEP_TIME);
}
/**
* Insert a whole batch of entries
* @throws Exception
*/
@Test
public void testBatchSink() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
}
/**
* Insert a mix of puts and deletes
* @throws Exception
*/
@Test
public void testMixedPutDelete() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
for(int i = 0; i < BATCH_SIZE/2; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = createEntry(TABLE_NAME1, i,
i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
}
SINK.replicateEntries(entries);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}
/**
* Insert to 2 different tables
* @throws Exception
*/
@Test
public void testMixedPutTables() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] =
createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
}
}
/**
* Insert then do different types of deletes
* @throws Exception
*/
@Test
public void testMixedDeletes() throws Exception {
HLog.Entry[] entries = new HLog.Entry[3];
for(int i = 0; i < 3; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
entries = new HLog.Entry[3];
entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
SINK.replicateEntries(entries);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(0, scanRes.next(3).length);
}
/**
* Puts are buffered, but this tests when a delete (not-buffered) is applied
* before the actual Put that creates it.
* @throws Exception
*/
@Test
public void testApplyDeleteBeforePut() throws Exception {
HLog.Entry[] entries = new HLog.Entry[5];
for(int i = 0; i < 2; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
entries[2] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
for(int i = 3; i < 5; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Get get = new Get(Bytes.toBytes(1));
Result res = table1.get(get);
assertEquals(0, res.size());
}
private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
// Just make sure we don't get the same ts for two consecutive rows with
// same key
try {
Thread.sleep(1);
} catch (InterruptedException e) {
LOG.info("Was interrupted while sleep, meh", e);
}
final long now = System.currentTimeMillis();
KeyValue kv = null;
if(type.getCode() == KeyValue.Type.Put.getCode()) {
kv = new KeyValue(rowBytes, fam, fam, now,
KeyValue.Type.Put, Bytes.toBytes(row));
} else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
kv = new KeyValue(rowBytes, fam, fam,
now, KeyValue.Type.DeleteColumn);
} else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
kv = new KeyValue(rowBytes, fam, null,
now, KeyValue.Type.DeleteFamily);
}
HLogKey key = new HLogKey(table, table, now, now);
WALEdit edit = new WALEdit();
edit.add(kv);
return new HLog.Entry(key, edit);
}
}

View File

@ -0,0 +1,209 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.URLEncoder;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
public class TestReplicationSourceManager {
private static final Log LOG =
LogFactory.getLog(TestReplicationSourceManager.class);
private static Configuration conf;
private static HBaseTestingUtility utility;
private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
private static ReplicationSourceManager manager;
private static ZooKeeperWrapper zkw;
private static HTableDescriptor htd;
private static HRegionInfo hri;
private static final byte[] r1 = Bytes.toBytes("r1");
private static final byte[] r2 = Bytes.toBytes("r2");
private static final byte[] f1 = Bytes.toBytes("f1");
private static final byte[] f2 = Bytes.toBytes("f2");
private static final byte[] test = Bytes.toBytes("test");
private static FileSystem fs;
private static Path oldLogDir;
private static Path logDir;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
zkw = ZooKeeperWrapper.createInstance(conf, "test");
zkw.writeZNode("/hbase", "replication", "");
zkw.writeZNode("/hbase/replication", "master",
conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1");
zkw.writeZNode("/hbase/replication/peers", "1",
conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1");
HRegionServer server = new HRegionServer(conf);
ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
server.getZooKeeperWrapper(), conf,
REPLICATING, "123456789");
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getTestDir(),
HConstants.HREGION_LOGDIR_NAME);
manager = new ReplicationSourceManager(helper,
conf, STOPPER, fs, REPLICATING, oldLogDir);
manager.addSource("1");
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor("f1");
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
htd.addFamily(col);
col = new HColumnDescriptor("f2");
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
hri = new HRegionInfo(htd, r1, r2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
manager.join();
utility.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
fs.delete(logDir, true);
fs.delete(oldLogDir, true);
}
@After
public void tearDown() throws Exception {
setUp();
}
@Test
public void testLogRoll() throws Exception {
long seq = 0;
long baseline = 1000;
long time = baseline;
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, manager,
URLEncoder.encode("regionserver:60020", "UTF8"));
manager.init();
// Testing normal log rolling every 20
for(long i = 1; i < 101; i++) {
if(i > 1 && i % 20 == 0) {
hlog.rollWriter();
}
LOG.info(i);
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
hlog.append(hri, key, edit);
}
// Simulate a rapid insert that's followed
// by a report that's still not totally complete (missing last one)
LOG.info(baseline + " and " + time);
baseline += 101;
time = baseline;
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
hlog.append(hri, key, edit);
}
assertEquals(6, manager.getHLogs().size());
hlog.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false);
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
hlog.append(hri, key, edit);
assertEquals(1, manager.getHLogs().size());
// TODO Need a case with only 2 HLogs and we only want to delete the first one
}
}