HBASE-2565 Remove contrib module from hbase

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@945916 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-05-18 21:55:53 +00:00
parent c83d862620
commit ef92b957cd
18 changed files with 2 additions and 2063 deletions

View File

@ -22,6 +22,7 @@ Release 0.21.0 - Unreleased
(Todd Lipcon via Stack)
HBASE-2541 Remove transactional contrib (Clint Morgan via Stack)
HBASE-2542 Fold stargate contrib into core
HBASE-2565 Remove contrib module from hbase
BUG FIXES
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew

View File

@ -1,83 +0,0 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script to add a peer to a cluster
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main add_peer.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "add_peer"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
c2 = HBaseConfiguration.create()
parts2 = ARGV[1].split(":")
LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
LOG.info("The addresses must be exactly the same as those in hbase-site.xml of each cluster.")
print "Are those info correct? [Y/n] "
answer = $stdin.gets.chomp
if answer.length != 0 || answer == "n" || answer == "no"
exit!
end
c1 = HBaseConfiguration.create()
c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
zkw1 = ZooKeeperWrapper.new(c1, EmptyWatcher.instance)
zkw1.writeZNode(parts1[2], "replication", "a")
zkw1.writeZNode(parts1[2] + "/replication", "master", ARGV[0]);
zkw1.writeZNode(parts1[2] + "/replication", "state", "true");
zkw1.writeZNode(parts1[2] + "/replication/peers", "test", ARGV[1]);
c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
zkw2 = ZooKeeperWrapper.new(c2, EmptyWatcher.instance)
zkw2.writeZNode(parts2[2], "replication", "a")
zkw2.writeZNode(parts2[2] + "/replication", "master", ARGV[0]);

View File

@ -1,81 +0,0 @@
#
# Copyright 2010 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script to recreate all tables from one cluster to another
# To see usage for this script, run:
#
# ${HBASE_HOME}/bin/hbase org.jruby.Main copy_tables_desc.rb
#
include Java
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.EmptyWatcher
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.HTableDescriptor
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper
# Name of this script
NAME = "copy_tables_desc"
# Print usage for this script
def usage
puts 'Usage: %s.rb master_zookeeper.quorum.peers:clientport:znode_parent slave_zookeeper.quorum.peers:clientport:znode_parent' % NAME
exit!
end
if ARGV.size != 2
usage
end
LOG = LogFactory.getLog(NAME)
parts1 = ARGV[0].split(":")
LOG.info("Master cluster located at " + parts1[0] + " port " + parts1[1] + " in folder " + parts1[2])
parts2 = ARGV[1].split(":")
LOG.info("Slave cluster located at " + parts2[0] + " port " + parts2[1] + " in folder " + parts2[2])
print "Are those info correct? [Y/n] "
answer = $stdin.gets.chomp
if answer.length != 0 || answer == "n" || answer == "no"
exit!
end
c1 = HBaseConfiguration.create()
c1.set(HConstants.ZOOKEEPER_QUORUM, parts1[0])
c1.set("hbase.zookeeper.property.clientPort", parts1[1])
c1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts1[2])
admin1 = HBaseAdmin.new(c1)
c2 = HBaseConfiguration.create()
c2.set(HConstants.ZOOKEEPER_QUORUM, parts2[0])
c2.set("hbase.zookeeper.property.clientPort", parts2[1])
c2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts2[2])
admin2 = HBaseAdmin.new(c2)
for t in admin1.listTables()
admin2.createTable(t)
end

View File

@ -1,45 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase-contrib-mdc_replication</artifactId>
<packaging>jar</packaging>
<name>HBase Contrib - Multi Datacenter Replication</name>
<parent>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-contrib</artifactId>
<version>0.21.0-SNAPSHOT</version>
</parent>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>always</forkMode>
<argLine>-Xmx1024m</argLine>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hbase-core</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hbase-core</artifactId>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,40 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import java.io.IOException;
/**
* Interface that defines replication
*/
public interface ReplicationRegionInterface extends HRegionInterface {
/**
* Replicates the given entries. The guarantee is that the given entries
* will be durable on the slave cluster if this method returns without
* and exception.
* @param entries entries to replicate
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
}

View File

@ -1,104 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* Specialized version of HRegion to handle replication. In particular,
* it replays all edits from the reconstruction log.
*/
public class ReplicationRegion extends HRegion {
static final Log LOG = LogFactory.getLog(ReplicationRegion.class);
private final ReplicationSource replicationSource;
public ReplicationRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener,
ReplicationSource repSource) {
super(basedir, log, fs, conf, regionInfo, flushListener);
this.replicationSource = repSource;
}
protected void doReconstructionLog(final Path oldLogFile,
final long minSeqId, final long maxSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
if(this.replicationSource == null) {
return;
}
if (oldLogFile == null || !getFilesystem().exists(oldLogFile)) {
return;
}
FileStatus[] stats = getFilesystem().listStatus(oldLogFile);
if (stats == null || stats.length == 0) {
LOG.warn("Passed reconstruction log " + oldLogFile
+ " is zero-length");
}
HLog.Reader reader = HLog.getReader(getFilesystem(), oldLogFile, getConf());
try {
HLog.Entry entry;
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
if (key.getLogSeqNum() < maxSeqId) {
continue;
}
// Don't replicate catalog entries and meta information like
// complete log flush.
if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) ||
Bytes.equals(key.getTablename(),META_TABLE_NAME)) &&
!Bytes.equals(val.getFamily(), HLog.METAFAMILY) &&
key.getScope() == REPLICATION_SCOPE_GLOBAL) {
this.replicationSource.enqueueLog(entry);
}
}
} finally {
reader.close();
}
}
}

View File

@ -1,166 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.replication.ReplicationHLog;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReplicationRegionServer extends HRegionServer
implements ReplicationRegionInterface {
protected static final Log LOG =
LogFactory.getLog(ReplicationRegionServer.class);
private final ReplicationSource replicationSource;
private ReplicationSink replicationSink;
private final boolean isMaster;
private final AtomicBoolean isReplicating = new AtomicBoolean(true);
private final ReplicationZookeeperHelper zkHelper;
/**
* Starts a HRegionServer at the default location
*
* @param conf
* @throws java.io.IOException
*/
public ReplicationRegionServer(Configuration conf) throws IOException {
super(conf);
this.zkHelper = new ReplicationZookeeperHelper(
this.getZooKeeperWrapper(), this.conf, this.isReplicating);
this.isMaster = zkHelper.isMaster();
this.replicationSink = null;
this.replicationSource = this.isMaster ? new ReplicationSource(this,
super.stopRequested, this.isReplicating) : null;
}
@Override
protected HLog instantiateHLog(Path logdir, Path oldLogDir)
throws IOException {
HLog newlog = new ReplicationHLog(super.getFileSystem(),
logdir, oldLogDir, conf, super.getLogRoller(),
this.replicationSource);
return newlog;
}
@Override
protected void init(final MapWritable c) throws IOException {
super.init(c);
String n = Thread.currentThread().getName();
String repLogPathStr =
ReplicationSink.getRepLogPath(getHServerInfo().getServerName());
Path repLogPath = new Path(getRootDir(), repLogPathStr);
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
abort();
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
if(this.isMaster) {
Threads.setDaemonThreadRunning(
this.replicationSource, n + ".replicationSource", handler);
} else {
this.replicationSink =
new ReplicationSink(conf,super.stopRequested,
repLogPath, getFileSystem(), getThreadWakeFrequency());
Threads.setDaemonThreadRunning(
this.replicationSink, n + ".replicationSink", handler);
}
}
@Override
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new ReplicationRegion(HTableDescriptor.getTableDir(super
.getRootDir(), regionInfo.getTableDesc().getName()), super.hlog, super
.getFileSystem(), super.conf, regionInfo,
super.getFlushRequester(), this.replicationSource);
r.initialize(null, new Progressable() {
public void progress() {
addProcessingMessage(regionInfo);
}
});
return r;
}
@Override
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
this.replicationSink.replicateEntries(entries);
}
/**
*
* @param protocol
* @param clientVersion
* @return
* @throws IOException
*/
public long getProtocolVersion(final String protocol,
final long clientVersion)
throws IOException {
if (protocol.equals(ReplicationRegionInterface.class.getName())) {
return HBaseRPCProtocolVersion.versionID;
}
throw new IOException("Unknown protocol to name node: " + protocol);
}
/**
*
* @return
*/
public ReplicationZookeeperHelper getZkHelper() {
return zkHelper;
}
protected void join() {
super.join();
if(this.isMaster) {
Threads.shutdown(this.replicationSource);
} else {
Threads.shutdown(this.replicationSink);
}
}
}

View File

@ -1,283 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* This class is responsible for replicating the edits coming
* from another cluster. All edits are first put into a log that will
* be read later by the main thread.
*
* This replication process is currently waiting for the edits to be applied
* before any other entry can be appended to the log.
*
* The log is rolled but old ones aren't kept at the moment.
*/
public class ReplicationSink extends Thread {
public static final String REPLICATION_LOG_DIR = ".replogs";
static final Log LOG = LogFactory.getLog(ReplicationSink.class);
private final Configuration conf;
private final HTablePool pool;
private final AtomicBoolean stop;
private HLog.Reader reader;
private HLog.Writer writer;
private final FileSystem fs;
private Path path;
private long position = 0;
private final Lock lock = new ReentrantLock();
private final Condition newData = lock.newCondition();
private final AtomicLong editsSize = new AtomicLong(0);
private long lastEditSize = 0;
private final long logrollsize;
private final long threadWakeFrequency;
/**
* Create a sink for replication
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @param path the path to the log
* @param fs the filesystem to use
* @param threadWakeFrequency how long should the thread wait for edits
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(final Configuration conf,
final AtomicBoolean stopper, Path path,
FileSystem fs, long threadWakeFrequency)
throws IOException {
this.conf = conf;
this.pool = new HTablePool(this.conf, 10);
this.stop = stopper;
this.fs = fs;
this.path = path;
long blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
this.logrollsize = (long)(blocksize * multi);
this.threadWakeFrequency = threadWakeFrequency;
rollLog();
}
/**
* Put this array of entries into a log that will be read later
* @param entries
* @throws IOException
*/
public void replicateEntries(HLog.Entry[] entries)
throws IOException {
try {
this.lock.lock();
if(!this.stop.get()) {
// add to WAL and defer actual inserts
try {
for(HLog.Entry entry : entries) {
this.writer.append(entry);
this.editsSize.addAndGet(entry.getKey().heapSize() +
entry.getEdit().heapSize());
}
this.writer.sync();
this.newData.signal();
} catch (IOException ioe) {
LOG.error("Unable to accept edit because", ioe);
throw ioe;
}
} else {
LOG.info("Won't be replicating data as we are shutting down");
}
} finally {
this.lock.unlock();
}
}
public void run() {
try {
HTableInterface table = null;
this.lock.lock();
while (!this.stop.get()) {
this.newData.await(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
try {
if(this.lastEditSize == this.editsSize.get()) {
continue;
}
// There's no tailing in HDFS so we create a new reader
// and seek every time
this.reader = HLog.getReader(this.fs, this.path, this.conf);
if (position != 0) {
this.reader.seek(position);
}
byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
List<Put> puts = new ArrayList<Put>();
// Very simple optimization where we batch sequences of rows going
// to the same table.
HLog.Entry entry = new HLog.Entry();
while (this.reader.next(entry) != null) {
KeyValue kv = entry.getEdit();
if (kv.isDelete()) {
Delete delete = new Delete(kv.getRow(), kv.getTimestamp(), null);
if (kv.isDeleteFamily()) {
delete.deleteFamily(kv.getFamily());
} else if (!kv.isEmptyColumn()) {
delete.deleteColumn(entry.getEdit().getFamily(),
kv.getQualifier());
}
table = pool.getTable(entry.getKey().getTablename());
table.delete(delete);
pool.putTable(table);
} else {
Put put = new Put(kv.getRow(), kv.getTimestamp(), null);
put.add(entry.getEdit().getFamily(),
kv.getQualifier(), kv.getValue());
// Switching table, flush
if (!Bytes.equals(lastTable, entry.getKey().getTablename())
&& !puts.isEmpty()) {
table = pool.getTable(lastTable);
table.put(puts);
pool.putTable(table);
puts.clear();
}
lastTable = entry.getKey().getTablename();
puts.add(put);
}
}
if (!puts.isEmpty()) {
table = pool.getTable(lastTable);
table.put(puts);
pool.putTable(table);
}
position = this.reader.getPosition();
if(this.editsSize.get() > this.logrollsize) {
rollLog();
}
this.lastEditSize = editsSize.get();
} catch (EOFException eof) {
LOG.warn("Got EOF while reading, will continue on next notify");
} catch (TableNotFoundException ex) {
LOG.warn("Losing edits because: " + ex);
} finally {
this.newData.signal();
if(this.reader != null) {
this.reader.close();
}
this.reader = null;
}
}
close();
} catch (Exception ex) {
// Should we log rejected edits in a file for replay?
LOG.error("Unable to accept edit because", ex);
this.stop.set(true);
} finally {
this.lock.unlock();
}
}
private void close() throws IOException {
this.writer.close();
if(reader != null) {
this.reader.close();
}
this.fs.delete(this.path,true);
}
// Delete the current log and start a new one with the same name
// TODO keep the old versions so that the writing thread isn't help up
// by the reading thead and this latter one could be reading older logs.
// At this point we are under the lock.
protected void rollLog() throws IOException {
if(! (this.editsSize.get() == 0)) {
this.writer.close();
if(this.reader != null) {
this.reader.close();
}
this.fs.delete(this.path,true);
}
this.writer = HLog.createWriter(this.fs, this.path, this.conf);
this.editsSize.set(0);
this.position = 0;
LOG.debug("New replication log");
}
/**
* Get the path of the file for this server
* @param serverName
* @return
*/
public static String getRepLogPath(String serverName) {
StringBuilder dirName = new StringBuilder(REPLICATION_LOG_DIR);
dirName.append("/");
dirName.append(serverName);
return dirName.toString();
}
}

View File

@ -1,166 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.ReplicationConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperHelper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Class that handles the source of a replication stream
* Currently does not handle more than 1 slave
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
*/
public class ReplicationSource extends Chore implements HConstants {
static final Log LOG = LogFactory.getLog(ReplicationSource.class);
private final LinkedBlockingQueue<HLog.Entry> queue =
new LinkedBlockingQueue<HLog.Entry>();
private final List<HLog.Entry> tempArray = new ArrayList<HLog.Entry>();
private final HLog.Entry[] dummyArray = new HLog.Entry[0];
private final ReplicationConnectionManager conn;
private final ReplicationZookeeperHelper zkHelper;
private final Configuration conf;
private final float ratio;
private final Random random;
private final AtomicBoolean isReplicating;
private final byte clusterId;
private List<HServerAddress> currentPeers;
/**
* Constructor used by region servers
* @param server the region server specialized in replication
* @param stopper the atomic boolean to use to stop the cluster
* @param isReplicating the atomic boolean that starts/stops replication
* @throws IOException
*/
public ReplicationSource(final ReplicationRegionServer server,
final AtomicBoolean stopper,
final AtomicBoolean isReplicating)
throws IOException {
super(server.getThreadWakeFrequency(), stopper);
this.conf = server.getConfiguration();
this.conn = new ReplicationConnectionManager(this.conf);
this.zkHelper = server.getZkHelper();
this.ratio = this.conf.getFloat("replication.ratio", 0.1f);
currentPeers = new ArrayList<HServerAddress>();
this.random = new Random();
this.clusterId = zkHelper.getClusterId();
this.isReplicating = isReplicating;
}
@Override
protected boolean initialChore() {
this.chooseSinksForPeer(0);
return currentPeers.size() > 0;
}
/**
* Select a number of peers at random using the ratio. Mininum 1.
* @param index
*/
private void chooseSinksForPeer(int index) {
this.currentPeers.clear();
List<HServerAddress> addresses = this.zkHelper.getPeersAddresses(index);
Map<String, HServerAddress> mapOfAdr =
new HashMap<String, HServerAddress>();
LOG.info("Considering " + addresses.size() +
" rs, with ratio " + ratio);
int nbPeers = (int)(Math.ceil (addresses.size()*ratio));
LOG.info("Getting " + nbPeers + " rs from peer cluster # " + index);
for(int i = 0; i < nbPeers; i++) {
HServerAddress adr =
addresses.get(this.random.nextInt(addresses.size()));
while(mapOfAdr.containsKey(adr.toString())) {
adr = addresses.get(this.random.nextInt(addresses.size()));
}
LOG.info("Choosing peer " + adr.toString());
mapOfAdr.put(adr.toString(), adr);
}
this.currentPeers.addAll(mapOfAdr.values());
}
/**
* Put a log entry in a replication queue if replication is enabled
* @param logEntry
*/
public void enqueueLog(HLog.Entry logEntry) {
if(this.isReplicating.get()) {
logEntry.getKey().setClusterId(this.clusterId);
this.queue.add(logEntry);
}
}
@Override
protected void chore() {
while(!super.stop.get()) {
// Drain the edits accumulated in the queue, select a node at random
// and send the edits. If it fails, get a new set of nodes and chose
// a new one to replicate to.
try {
this.queue.drainTo(this.tempArray);
if(this.tempArray.size() > 0) {
HServerAddress adr =
currentPeers.get(random.nextInt(this.currentPeers.size()));
ReplicationRegionInterface rrs = this.conn.getHRegionConnection(adr);
LOG.debug("Replicating " + this.tempArray.size()
+ " to " + adr.toString());
rrs.replicateLogEntries(this.tempArray.toArray(dummyArray));
this.tempArray.clear();
}
return;
}
catch (IOException ioe) {
LOG.warn("Unable to replicate, retrying with a new node", ioe);
try{
Thread.sleep(1000);
} catch (InterruptedException e){
// continue
}
// Should wait in a backoff fashion?
// make sure we don't retry with the same node
chooseSinksForPeer(0);
}
}
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
import java.io.IOException;
/**
* HLog specialized in replication. It replicates every entry from every
* user table at the moment.
*/
public class ReplicationHLog extends HLog {
static final Log LOG = LogFactory.getLog(ReplicationHLog.class);
private ReplicationSource replicationSource;
private final boolean isReplicator;
/**
* New constructor used for replication
* @param fs filesystem to use
* @param dir directory to store the wal
* @param conf conf ot use
* @param listener log listener to pass to super class
* @param replicationSource where to put the entries
* @throws IOException
*/
public ReplicationHLog(final FileSystem fs, final Path dir,
final Path oldLogDir, final Configuration conf,
final LogRollListener listener,
ReplicationSource replicationSource)
throws IOException {
super(fs, dir, oldLogDir, conf, listener);
this.replicationSource = replicationSource;
this.isReplicator = this.replicationSource != null;
}
@Override
protected void doWrite(HRegionInfo info, HLogKey logKey,
KeyValue logEdit)
throws IOException {
logKey.setScope(info.getTableDesc().getFamily(logEdit.getFamily()).getScope());
super.doWrite(info, logKey, logEdit);
if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion()) &&
logKey.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL) {
this.replicationSource.enqueueLog(new Entry(logKey, logEdit));
}
}
public ReplicationSource getReplicationSource() {
return this.replicationSource;
}
}

View File

@ -1,97 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Connection manager to communicate with the other clusters.
*/
public class ReplicationConnectionManager implements HConstants {
private final int numRetries;
private final int maxRPCAttempts;
private final long rpcTimeout;
private final Map<String, ReplicationRegionInterface> servers =
new ConcurrentHashMap<String, ReplicationRegionInterface>();
private final
Class<? extends ReplicationRegionInterface> serverInterfaceClass;
private final Configuration conf;
/**
* Constructor that sets up RPC to other clusters
* @param conf
*/
public ReplicationConnectionManager(Configuration conf) {
this.conf = conf;
String serverClassName =
conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS);
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
this.rpcTimeout = conf.getLong(HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
try {
this.serverInterfaceClass =
(Class<? extends ReplicationRegionInterface>)
Class.forName(serverClassName);
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e);
}
}
/**
* Get a connection to a distant region server for replication
* @param regionServer the address to use
* @return the connection to the region server
* @throws IOException
*/
public ReplicationRegionInterface getHRegionConnection(
HServerAddress regionServer)
throws IOException {
ReplicationRegionInterface server;
synchronized (this.servers) {
// See if we already have a connection
server = this.servers.get(regionServer.toString());
if (server == null) { // Get a connection
try {
server = (ReplicationRegionInterface) HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
this.maxRPCAttempts, this.rpcTimeout);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
this.servers.put(regionServer.toString(), server);
}
}
return server;
}
}

View File

@ -1,184 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class servers as a helper for all things related to zookeeper
* in the replication contrib.
*/
public class ReplicationZookeeperHelper implements HConstants, Watcher {
static final Log LOG = LogFactory.getLog(ReplicationZookeeperHelper.class);
private final ZooKeeperWrapper zookeeperWrapper;
private final List<ZooKeeperWrapper> peerClusters;
private final String replicationZNode;
private final String peersZNode;
private final String replicationStateNodeName;
private final boolean master;
private final Configuration conf;
private final AtomicBoolean isReplicating;
private final byte clusterId;
/**
* Constructor used by region servers
* @param zookeeperWrapper zkw to wrap
* @param conf conf to use
* @param isReplicating atomic boolean to start/stop replication
* @throws IOException
*/
public ReplicationZookeeperHelper(
ZooKeeperWrapper zookeeperWrapper, Configuration conf,
final AtomicBoolean isReplicating) throws IOException{
this.zookeeperWrapper = zookeeperWrapper;
this.conf = conf;
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.peers", "peers");
String repMasterZNodeName =
conf.get("zookeeper.znode.master", "master");
this.replicationStateNodeName =
conf.get("zookeeper.znode.state", "state");
String clusterIdName =
conf.get("zookeeper.znode.clusterId", "clusterId");
this.peerClusters = new ArrayList<ZooKeeperWrapper>();
this.replicationZNode = zookeeperWrapper.getZNode(
zookeeperWrapper.getParentZNode(),replicationZNodeName);
this.peersZNode =
zookeeperWrapper.getZNode(replicationZNode,peersZNodeName);
List<String> znodes =
this.zookeeperWrapper.listZnodes(this.peersZNode, this);
if(znodes != null) {
for(String znode : znodes) {
connectToPeer(znode);
}
}
String address = this.zookeeperWrapper.getData(this.replicationZNode,
repMasterZNodeName);
String idResult = this.zookeeperWrapper.getData(this.replicationZNode,
clusterIdName);
this.clusterId =
idResult == null ? DEFAULT_CLUSTER_ID : Byte.valueOf(idResult);
String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+
this.conf.get("hbase.zookeeper.property.clientPort") +":" +
this.conf.get(ZOOKEEPER_ZNODE_PARENT);
this.master = thisCluster.equals(address);
LOG.info("This cluster (" + thisCluster + ") is a "
+ (this.master ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
this.isReplicating = isReplicating;
setIsReplicating();
}
/**
* Returns all region servers from given peer
* @param clusterIndex the cluster to interrogate
* @return addresses of all region servers
*/
public List<HServerAddress> getPeersAddresses(int clusterIndex) {
return this.peerClusters.size() == 0 ?
null : this.peerClusters.get(clusterIndex).scanRSDirectory();
}
// This method connects this cluster to another one and registers it
private void connectToPeer(String znode) throws IOException {
String[] quorum =
this.zookeeperWrapper.getData(this.peersZNode, znode).split(":");
if(quorum.length == 3) {
Configuration otherConf = new Configuration(this.conf);
otherConf.set(ZOOKEEPER_QUORUM, quorum[0]);
otherConf.set("hbase.zookeeper.property.clientPort", quorum[1]);
otherConf.set(ZOOKEEPER_ZNODE_PARENT, quorum[2]);
this.peerClusters.add(new ZooKeeperWrapper(otherConf, this));
LOG.info("Added new peer cluster " + StringUtils.arrayToString(quorum));
} else {
LOG.error("Wrong format of cluster address: " +
this.zookeeperWrapper.getData(this.peersZNode, znode));
}
}
/**
* Tells if this cluster replicates or not
* @return if this is a master
*/
public boolean isMaster() {
return master;
}
/**
* Get the identification of the cluster
* @return the id for the cluster
*/
public byte getClusterId() {
return this.clusterId;
}
@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
LOG.info(("Got event " + type + " with path " + watchedEvent.getPath()));
if (type.equals(Event.EventType.NodeDataChanged)) {
setIsReplicating();
}
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
private void setIsReplicating() {
String value = this.zookeeperWrapper.getDataAndWatch(
this.replicationZNode, this.replicationStateNodeName, this);
if(value != null) {
isReplicating.set(value.equals("true"));
LOG.info("Replication is now " + (isReplicating.get() ?
"started" : "stopped"));
}
}
}

View File

@ -1,136 +0,0 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
<!--
Copyright 2010 The Apache Software Foundation
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head />
<body bgcolor="white">
<h1>Multi Data Center Replication</h1>
This package provides replication between HBase clusters.
<p>
<h2>Table Of Contents</h2>
<ol>
<li><a href="#status">Status</a></li>
<li><a href="#requirements">Requirements</a></li>
<li><a href="#deployment">Deployment</a></li>
</ol>
<p>
<a name="status">
<h2>Status</h2>
</a>
<p>
This package isn't even alpha quality software and is only meant to be a base
for future developments. The current implementation offers the following
features:
<ol>
<li>Master/Slave replication limited to 1 slave. </li>
<li>Replication of all user tables.</li>
<li>Start/stop replication stream.</li>
<li>Supports cluters of different sizes.</li>
<li>Re-replication of entries from failed region
servers on the master cluster.</li>
</ol>
Please report bugs on the project's Jira when found.
<p>
<a name="requirements">
<h2>Requirements</h2>
</a>
<p>
Before trying out replication, make sure to review the following requirements:
<ol>
<li>Zookeeper should be handled by yourself, not by HBase, and should
always be available during the deployment. Currently you can't use
zoo.cfg to hold your Zookeeper configurations.</li>
<li>All machines from both clusters should be able to reach every
other machine since replication goes from any region server to any
other one on the slave cluster. That also includes the
Zookeeper clusters.</li>
<li>Both clusters should have the same HBase and Hadoop major revision.
For example, having 0.21.1 on the master and 0.21.0 on the slave is
correct but not 0.21.1 and 0.22.0.</li>
<li>Every table should exist with the exact same name and column
family names on both clusters.</li>
</ol>
<p>
<a name="deployment">
<h2>Deployment</h2>
</a>
<p>
The following steps describe how to enable replication from a cluster
to another. This must be done with both clusters offlined.
<ol>
<li>Copy the hbase-0.21.0-dev-mdc_replication.jar file from the
$HBASE_HOME/contrib/mdc_replication/ folder to $HBASE_HOME/lib on
both clusters.</li>
<li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
the following configurations:
<pre>
&lt;property&gt;
&lt;name&gt;hbase.regionserver.class&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.hbase.ipc.ReplicationRegionInterface&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;hbase.regionserver.impl&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer&lt;/value&gt;
&lt;/property&gt;</pre>
</li>
<li>Run the following command on any cluster:
<pre>
$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/src/contrib/mdc_replication/bin/add_peer.tb</pre>
This will show you the help to setup the replication stream between
both clusters. If both clusters use the same Zookeeper cluster, you have
to use a different <b>zookeeper.znode.parent</b> since they can't
write in the same folder.
</li>
<li>You can now start and stop the clusters with your preferred method.</li>
</ol>
You can confirm that your setup works by looking at any region server's log
on the master cluster and look for the following lines;
<pre>
Considering 1 rs, with ratio 0.1
Getting 1 rs from peer cluster # 0
Choosing peer 10.10.1.49:62020</pre>
In this case it indicates that 1 region server from the slave cluster
was chosen for replication.<br><br>
Should you want to stop the replication while the clusters are running, open
the shell on the master cluster and issue this command:
<pre>
hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
Where you replace the znode parent with the one configured on your master
cluster. Replication of already queued edits will still happen after you
issued that command but new entries won't be. To start it back, simply replace
"false" with "true" in the command.
<p>
</body>
</html>

View File

@ -1,219 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.fs.Path;
import java.util.concurrent.atomic.AtomicBoolean;
public class TestReplicationSink {
protected static final Log LOG =
LogFactory.getLog(TestReplicationSink.class);
private static final int BATCH_SIZE = 10;
private static final long SLEEP_TIME = 500;
private final static Configuration conf = HBaseConfiguration.create();
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static ReplicationSink SINK;
private static final byte[] TABLE_NAME1 =
Bytes.toBytes("table1");
private static final byte[] TABLE_NAME2 =
Bytes.toBytes("table2");
private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
private static HTable table1;
private static HTable table2;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
Path repLogPath = new Path(TEST_UTIL.getTestDir(),
ReplicationSink.getRepLogPath("test_rep_sink"));
SINK = new ReplicationSink(conf,STOPPER,
TEST_UTIL.getTestDir(),
TEST_UTIL.getDFSCluster().getFileSystem(), 1000);
SINK.start();
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
STOPPER.set(true);
TEST_UTIL.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
Thread.sleep(SLEEP_TIME);
}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {}
@Test
public void testBatchSink() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(scanRes.next(BATCH_SIZE).length, BATCH_SIZE);
}
@Test
public void testMixedPutDelete() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
for(int i = 0; i < BATCH_SIZE; i+=2) {
entries[i/2] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] = createEntry(TABLE_NAME1, i,
i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(BATCH_SIZE/2,scanRes.next(BATCH_SIZE).length);
}
@Test
public void testMixedPutTables() throws Exception {
HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
for(int i = 0; i < BATCH_SIZE; i++) {
entries[i] =
createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
}
}
@Test
public void testMixedDeletes() throws Exception {
HLog.Entry[] entries = new HLog.Entry[3];
for(int i = 0; i < 3; i++) {
entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
}
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
entries = new HLog.Entry[3];
entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
SINK.replicateEntries(entries);
Thread.sleep(SLEEP_TIME);
Scan scan = new Scan();
ResultScanner scanRes = table1.getScanner(scan);
assertEquals(0, scanRes.next(3).length);
}
@Test
public void testRolling() throws Exception {
testMixedDeletes();
SINK.rollLog();
testMixedDeletes();
SINK.rollLog();
testMixedPutTables();
}
private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
final long now = System.currentTimeMillis();
KeyValue kv = null;
if(type.getCode() == KeyValue.Type.Put.getCode()) {
kv = new KeyValue(rowBytes, fam, fam, now,
KeyValue.Type.Put, Bytes.toBytes(row));
} else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
kv = new KeyValue(rowBytes, fam, fam,
now, KeyValue.Type.DeleteColumn);
} else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
kv = new KeyValue(rowBytes, fam, null,
now, KeyValue.Type.DeleteFamily);
}
HLogKey key = new HLogKey(table, table, now, now);
return new HLog.Entry(key, kv);
}
}

View File

@ -1,331 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.EmptyWatcher;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ipc.ReplicationRegionInterface;
import org.apache.hadoop.hbase.mapreduce.CopyTable;
import org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
import org.junit.AfterClass;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestReplication implements HConstants{
protected static final Log LOG = LogFactory.getLog(TestReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static ZooKeeperWrapper zkw1;
private static ZooKeeperWrapper zkw2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static final int NB_ROWS_IN_BATCH = 100;
private static final long SLEEP_TIME = 500; //ms
private static final int NB_RETRIES = 10;
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] noRepfamName = Bytes.toBytes("norep");
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
conf1 = HBaseConfiguration.create();
conf1.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
.getName());
conf1.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
.getName());
conf1.set(ZOOKEEPER_ZNODE_PARENT, "/1");
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = new ZooKeeperWrapper(conf1, EmptyWatcher.instance);
zkw1.writeZNode("/1", "replication", "");
zkw1.writeZNode("/1/replication", "master",
conf1.get(ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
setIsReplication("true");
LOG.info("Setup first Zk");
conf2 = HBaseConfiguration.create();
conf2.set(REGION_SERVER_CLASS, ReplicationRegionInterface.class
.getName());
conf2.set(REGION_SERVER_IMPL, ReplicationRegionServer.class
.getName());
conf2.set(ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = new ZooKeeperWrapper(conf2, EmptyWatcher.instance);
zkw2.writeZNode("/2", "replication", "");
zkw2.writeZNode("/2/replication", "master",
conf1.get(ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1");
zkw1.writeZNode("/1/replication/peers", "test",
conf2.get(ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2");
LOG.info("Setup second Zk");
utility1.startMiniCluster();
utility2.startMiniCluster();
utility1.startMiniMapReduceCluster();
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
HBaseAdmin admin1 = new HBaseAdmin(conf1);
HBaseAdmin admin2 = new HBaseAdmin(conf2);
admin1.createTable(table);
admin2.createTable(table);
} catch (Exception ex) { ex.printStackTrace(); throw ex; }
}
private static void setIsReplication(String bool) throws Exception {
zkw1.writeZNode("/1/replication", "state", bool);
// Takes some ms for ZK to fire the watcher
Thread.sleep(100);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility1.shutdownMiniCluster();
utility2.shutdownMiniCluster();
}
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {}
/**
* @throws java.lang.Exception
*/
@After
public void tearDown() throws Exception {
setIsReplication("false");
utility1.truncateTable(tableName);
utility2.truncateTable(tableName);
setIsReplication("true");
}
@Test
public void testReplication() throws Exception {
Put put = new Put(row);
put.add(famName, row, row);
HTable table1 = new HTable(conf1, tableName);
table1.put(put);
HTable table2 = new HTable(conf2, tableName);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = table2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
Delete del = new Delete(row);
table1.delete(del);
table2 = new HTable(conf2, tableName);
get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
Result res = table2.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
// normal Batch tests
table1.setAutoFlush(false);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
table1.put(put);
}
table1.flushCommits();
Scan scan = new Scan();
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for normal batch replication");
}
ResultScanner scanner = table2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH);
scanner.close();
if (res.length != NB_ROWS_IN_BATCH) {
LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
table1.setAutoFlush(true);
// Test stopping replication
setIsReplication("false");
put = new Put(Bytes.toBytes("stop start"));
put.add(famName, row, row);
table1.put(put);
get = new Get(Bytes.toBytes("stop start"));
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
break;
}
Result res = table2.get(get);
if(res.size() >= 1) {
fail("Replication wasn't stopped");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
// Test restart replication
setIsReplication("true");
table1.put(put);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = table2.get(get);
if(res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
put = new Put(Bytes.toBytes("do not rep"));
put.add(noRepfamName, row, row);
table1.put(put);
get = new Get(Bytes.toBytes("do not rep"));
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES-1) {
break;
}
Result res = table2.get(get);
if (res.size() >= 1) {
fail("Not supposed to be replicated");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
}
@Test
public void testMRCopy() throws Exception {
setIsReplication("false");
HTable table1 = new HTable(conf1, tableName);
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(famName, row, row);
table1.put(put);
}
String[] args = new String[] {
"--rs.class="+ReplicationRegionInterface.class.getName(),
"--rs.impl="+ReplicationRegionServer.class.getName(),
"--peer.adr="+conf2.get(ZOOKEEPER_QUORUM)+":/2",
"--families=f", "test"};
Job job = CopyTable.createSubmittableJob(conf1, args);
assertTrue(job.waitForCompletion(true));
HTable table2 = new HTable(conf2, tableName);
Scan scan = new Scan();
ResultScanner scanner = table2.getScanner(scan);
Result[] res = scanner.next(NB_ROWS_IN_BATCH);
scanner.close();
assertEquals(NB_ROWS_IN_BATCH, res.length);
}
}

View File

@ -1,22 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.21.0-SNAPSHOT</version>
</parent>
<artifactId>hbase-contrib</artifactId>
<packaging>pom</packaging>
<name>HBase Contrib</name>
<modules>
<!-- Commenting out for the moment. This is being rewritten. Will then pick up new WALEdit hlog value
<module>mdc_replication</module>
St.Ack Fri Mar 19 13:20:15 PDT 2010
-->
</modules>
</project>

View File

@ -16,7 +16,6 @@
<modules>
<module>core</module>
<module>contrib</module>
</modules>
<licenses>

View File

@ -35,24 +35,6 @@
<unpack>false</unpack>
</binaries>
</moduleSet>
<moduleSet>
<includes>
<include>org.apache.hbase:hbase-contrib-mdc_replication</include>
</includes>
<binaries>
<outputDirectory>contrib/mdc_replication/</outputDirectory>
<unpack>false</unpack>
</binaries>
</moduleSet>
<moduleSet>
<includes>
<include>org.apache.hbase:hbase-contrib-mdc_replication</include>
</includes>
<binaries>
<outputDirectory>contrib/mdc_replication/</outputDirectory>
<unpack>false</unpack>
</binaries>
</moduleSet>
</moduleSets>
<fileSets>
<fileSet>