HBASE-2178 Hooks for replication
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@905834 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d189828325
commit
443686f101
|
@ -342,6 +342,7 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-1373 Update Thrift to use compact/framed protocol (Lars Francke via Stack)
|
||||
HBASE-2172 Add constructor to Put for row key and timestamp
|
||||
(Lars Francke via Stack)
|
||||
HBASE-2178 Hooks for replication
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -93,6 +93,11 @@ public interface HConstants {
|
|||
/** Default ZooKeeper pause value. In milliseconds. */
|
||||
static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
|
||||
|
||||
/** Parameter name for the root dir in ZK for this cluster */
|
||||
static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
|
||||
|
||||
static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
|
||||
|
||||
/** Parameter name for port region server listens on. */
|
||||
static final String REGIONSERVER_PORT = "hbase.regionserver.port";
|
||||
|
||||
|
|
|
@ -168,11 +168,12 @@ public class HConnectionManager implements HConstants {
|
|||
*/
|
||||
public static synchronized ClientZKWatcher getClientZooKeeperWatcher(
|
||||
Configuration conf) throws IOException {
|
||||
if (!ZK_WRAPPERS.containsKey(conf.get(HConstants.ZOOKEEPER_QUORUM))) {
|
||||
ZK_WRAPPERS.put(conf.get(HConstants.ZOOKEEPER_QUORUM),
|
||||
if (!ZK_WRAPPERS.containsKey(
|
||||
ZooKeeperWrapper.getZookeeperClusterKey(conf))) {
|
||||
ZK_WRAPPERS.put(ZooKeeperWrapper.getZookeeperClusterKey(conf),
|
||||
new ClientZKWatcher(conf));
|
||||
}
|
||||
return ZK_WRAPPERS.get(conf.get(HConstants.ZOOKEEPER_QUORUM));
|
||||
return ZK_WRAPPERS.get(ZooKeeperWrapper.getZookeeperClusterKey(conf));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -52,6 +52,8 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
|
@ -151,6 +153,9 @@ public class HbaseObjectWritable implements Writable, Configurable {
|
|||
addToMap(FirstKeyOnlyFilter.class, code++);
|
||||
|
||||
addToMap(Delete [].class, code++);
|
||||
addToMap(HLog.Entry.class, code++);
|
||||
addToMap(HLog.Entry[].class, code++);
|
||||
addToMap(HLogKey.class, code++);
|
||||
}
|
||||
|
||||
private Class<?> declaredClass;
|
||||
|
|
|
@ -1177,7 +1177,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
if (writeToWAL) {
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
this.log.append(regionInfo,
|
||||
regionInfo.getTableDesc().getName(), kvs, now);
|
||||
}
|
||||
flush = isFlushSize(size);
|
||||
|
@ -1450,7 +1450,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
try {
|
||||
if (writeToWAL) {
|
||||
long now = System.currentTimeMillis();
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
this.log.append(regionInfo,
|
||||
regionInfo.getTableDesc().getName(), edits, now);
|
||||
}
|
||||
long size = 0;
|
||||
|
@ -2355,7 +2355,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
long now = System.currentTimeMillis();
|
||||
List<KeyValue> edits = new ArrayList<KeyValue>(1);
|
||||
edits.add(newKv);
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
this.log.append(regionInfo,
|
||||
regionInfo.getTableDesc().getName(), edits, now);
|
||||
}
|
||||
|
||||
|
|
|
@ -1182,7 +1182,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
* Wait on all threads to finish.
|
||||
* Presumption is that all closes and stops have already been called.
|
||||
*/
|
||||
void join() {
|
||||
protected void join() {
|
||||
Threads.shutdown(this.majorCompactionChecker);
|
||||
Threads.shutdown(this.workerThread);
|
||||
Threads.shutdown(this.cacheFlusher);
|
||||
|
@ -2394,6 +2394,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Interval at which threads should run
|
||||
* @return the interval
|
||||
*/
|
||||
public int getThreadWakeFrequency() {
|
||||
return threadWakeFrequency;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
*/
|
||||
|
|
|
@ -19,6 +19,29 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -42,26 +65,6 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
/**
|
||||
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
|
||||
* implementation.
|
||||
|
@ -125,6 +128,10 @@ public class HLog implements HConstants, Syncable {
|
|||
|
||||
Entry next(Entry reuse) throws IOException;
|
||||
|
||||
void seek(long pos) throws IOException;
|
||||
|
||||
long getPosition() throws IOException;
|
||||
|
||||
}
|
||||
|
||||
public interface Writer {
|
||||
|
@ -649,7 +656,7 @@ public class HLog implements HConstants, Syncable {
|
|||
// region being flushed is removed if the sequence number of the flush
|
||||
// is greater than or equal to the value in lastSeqWritten.
|
||||
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum));
|
||||
doWrite(logKey, logEdit, logKey.getWriteTime());
|
||||
doWrite(regionInfo, logKey, logEdit, logKey.getWriteTime());
|
||||
this.unflushedEntries.incrementAndGet();
|
||||
this.numEntries.incrementAndGet();
|
||||
}
|
||||
|
@ -677,15 +684,16 @@ public class HLog implements HConstants, Syncable {
|
|||
* synchronized prevents appends during the completion of a cache flush or for
|
||||
* the duration of a log roll.
|
||||
*
|
||||
* @param regionName
|
||||
* @param info
|
||||
* @param tableName
|
||||
* @param edits
|
||||
* @param now
|
||||
* @throws IOException
|
||||
*/
|
||||
public void append(byte [] regionName, byte [] tableName, List<KeyValue> edits,
|
||||
public void append(HRegionInfo info, byte [] tableName, List<KeyValue> edits,
|
||||
final long now)
|
||||
throws IOException {
|
||||
byte[] regionName = info.getRegionName();
|
||||
if (this.closed) {
|
||||
throw new IOException("Cannot append; log is closed");
|
||||
}
|
||||
|
@ -700,7 +708,7 @@ public class HLog implements HConstants, Syncable {
|
|||
int counter = 0;
|
||||
for (KeyValue kv: edits) {
|
||||
HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now);
|
||||
doWrite(logKey, kv, now);
|
||||
doWrite(info, logKey, kv, now);
|
||||
this.numEntries.incrementAndGet();
|
||||
}
|
||||
|
||||
|
@ -847,7 +855,7 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
}
|
||||
|
||||
private void doWrite(HLogKey logKey, KeyValue logEdit, final long now)
|
||||
protected void doWrite(HRegionInfo info, HLogKey logKey, KeyValue logEdit, final long now)
|
||||
throws IOException {
|
||||
if (!this.enabled) {
|
||||
return;
|
||||
|
@ -1243,7 +1251,7 @@ public class HLog implements HConstants, Syncable {
|
|||
* Utility class that lets us keep track of the edit with it's key
|
||||
* Only used when splitting logs
|
||||
*/
|
||||
public static class Entry {
|
||||
public static class Entry implements Writable {
|
||||
private KeyValue edit;
|
||||
private HLogKey key;
|
||||
|
||||
|
@ -1281,6 +1289,18 @@ public class HLog implements HConstants, Syncable {
|
|||
public String toString() {
|
||||
return this.key + "=" + this.edit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput dataOutput) throws IOException {
|
||||
this.key.write(dataOutput);
|
||||
this.edit.write(dataOutput);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput dataInput) throws IOException {
|
||||
this.key.readFields(dataInput);
|
||||
this.edit.readFields(dataInput);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -107,4 +107,14 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
reader.seek(pos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPosition() throws IOException {
|
||||
return reader.getPosition();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
public void sync() throws IOException {
|
||||
this.writer.sync();
|
||||
if (this.writer_out != null) {
|
||||
this.writer_out.sync();
|
||||
this.writer_out.hflush();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,18 +19,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -42,11 +30,23 @@ import org.apache.hadoop.util.StringUtils;
|
|||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.ZooKeeper.States;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Wraps a ZooKeeper instance and adds HBase specific functionality.
|
||||
*
|
||||
|
@ -93,7 +93,8 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
throw new IOException(e);
|
||||
}
|
||||
|
||||
parentZNode = conf.get("zookeeper.znode.parent", "/hbase");
|
||||
parentZNode = conf.get(ZOOKEEPER_ZNODE_PARENT,
|
||||
DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
|
||||
String rootServerZNodeName = conf.get("zookeeper.znode.rootserver",
|
||||
"root-region-server");
|
||||
|
@ -579,18 +580,7 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
* @return A list of server addresses
|
||||
*/
|
||||
public List<HServerAddress> scanRSDirectory() {
|
||||
List<HServerAddress> addresses = new ArrayList<HServerAddress>();
|
||||
try {
|
||||
List<String> nodes = zooKeeper.getChildren(rsZNode, false);
|
||||
for (String node : nodes) {
|
||||
addresses.add(readAddress(rsZNode + ZNODE_PATH_SEPARATOR + node, null));
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to read " + rsZNode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to read " + rsZNode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return addresses;
|
||||
return scanAddressDirectory(rsZNode, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -636,7 +626,7 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
private String getZNode(String parentZNode, String znodeName) {
|
||||
public String getZNode(String parentZNode, String znodeName) {
|
||||
return znodeName.charAt(0) == ZNODE_PATH_SEPARATOR ?
|
||||
znodeName : joinPath(parentZNode, znodeName);
|
||||
}
|
||||
|
@ -652,6 +642,90 @@ public class ZooKeeperWrapper implements HConstants {
|
|||
public String getMasterElectionZNode() {
|
||||
return masterElectionZNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the path of the parent ZNode
|
||||
* @return path of that znode
|
||||
*/
|
||||
public String getParentZNode() {
|
||||
return parentZNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scan a directory of address data.
|
||||
* @param znode The parent node
|
||||
* @param watcher The watcher to put on the found znodes, if not null
|
||||
* @return The directory contents
|
||||
*/
|
||||
public List<HServerAddress> scanAddressDirectory(String znode,
|
||||
Watcher watcher) {
|
||||
List<HServerAddress> list = new ArrayList<HServerAddress>();
|
||||
List<String> nodes = this.listZnodes(znode, watcher);
|
||||
if(nodes == null) {
|
||||
return list;
|
||||
}
|
||||
for (String node : nodes) {
|
||||
String path = joinPath(znode, node);
|
||||
list.add(readAddress(path, watcher));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
public List<String> listZnodes(String znode, Watcher watcher) {
|
||||
List<String> nodes = null;
|
||||
try {
|
||||
if (checkExistenceOf(znode)) {
|
||||
nodes = zooKeeper.getChildren(znode, watcher);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
||||
public String getData(String parentZNode, String znode) {
|
||||
return getDataAndWatch(parentZNode, znode, null);
|
||||
}
|
||||
|
||||
public String getDataAndWatch(String parentZNode,
|
||||
String znode, Watcher watcher) {
|
||||
String data = null;
|
||||
try {
|
||||
String path = joinPath(parentZNode, znode);
|
||||
if (checkExistenceOf(path)) {
|
||||
data = Bytes.toString(zooKeeper.getData(path, watcher, null));
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Failed to read " + znode + " znode in ZooKeeper: " + e);
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
public void writeZNode(String parentPath, String child, String strData)
|
||||
throws InterruptedException, KeeperException {
|
||||
String path = joinPath(parentPath, child);
|
||||
if (!ensureExists(parentPath)) {
|
||||
LOG.error("unable to ensure parent exists: " + parentPath);
|
||||
}
|
||||
byte[] data = Bytes.toBytes(strData);
|
||||
try {
|
||||
this.zooKeeper.create(path, data,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
||||
LOG.debug("Created " + path);
|
||||
} catch (KeeperException.NodeExistsException ex) {
|
||||
this.zooKeeper.setData(path, data, -1);
|
||||
LOG.debug("Updated " + path);
|
||||
}
|
||||
}
|
||||
|
||||
public static String getZookeeperClusterKey(Configuration conf) {
|
||||
return conf.get(ZOOKEEPER_QUORUM)+":"+
|
||||
conf.get(ZOOKEEPER_ZNODE_PARENT);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -62,14 +62,22 @@ import org.apache.zookeeper.ZooKeeper;
|
|||
public class HBaseTestingUtility {
|
||||
|
||||
private final Log LOG = LogFactory.getLog(getClass());
|
||||
|
||||
private final Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
private final Configuration conf;
|
||||
private MiniZooKeeperCluster zkCluster = null;
|
||||
private MiniDFSCluster dfsCluster = null;
|
||||
private MiniHBaseCluster hbaseCluster = null;
|
||||
private MiniMRCluster mrCluster = null;
|
||||
private File clusterTestBuildDir = null;
|
||||
private HBaseAdmin hbaseAdmin = null;
|
||||
|
||||
public HBaseTestingUtility() {
|
||||
this(HBaseConfiguration.create());
|
||||
}
|
||||
|
||||
public HBaseTestingUtility(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/** System property key to get test directory value.
|
||||
*/
|
||||
|
@ -106,6 +114,15 @@ public class HBaseTestingUtility {
|
|||
startMiniCluster(1);
|
||||
}
|
||||
|
||||
public void startMiniZKCluster() throws Exception {
|
||||
// Note that this is done before we create the MiniHBaseCluster because we
|
||||
// need to edit the config to add the ZooKeeper servers.
|
||||
this.zkCluster = new MiniZooKeeperCluster();
|
||||
int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
|
||||
this.conf.set("hbase.zookeeper.property.clientPort",
|
||||
Integer.toString(clientPort));
|
||||
}
|
||||
|
||||
/**
|
||||
* Start up a minicluster of hbase, optinally dfs, and zookeeper.
|
||||
* Modifies Configuration. Homes the cluster data directory under a random
|
||||
|
@ -156,12 +173,10 @@ public class HBaseTestingUtility {
|
|||
this.conf.set("fs.defaultFS", fs.getUri().toString());
|
||||
this.dfsCluster.waitClusterUp();
|
||||
|
||||
// Note that this is done before we create the MiniHBaseCluster because we
|
||||
// need to edit the config to add the ZooKeeper servers.
|
||||
this.zkCluster = new MiniZooKeeperCluster();
|
||||
int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
|
||||
this.conf.set("hbase.zookeeper.property.clientPort",
|
||||
Integer.toString(clientPort));
|
||||
// It could be created before the cluster
|
||||
if(this.zkCluster == null) {
|
||||
startMiniZKCluster();
|
||||
}
|
||||
|
||||
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
|
||||
Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory());
|
||||
|
@ -304,6 +319,23 @@ public class HBaseTestingUtility {
|
|||
return new HTable(getConfiguration(), tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide an existing table name to truncate
|
||||
* @param tableName existing table
|
||||
* @return HTable to that new table
|
||||
* @throws IOException
|
||||
*/
|
||||
public HTable truncateTable(byte [] tableName) throws IOException {
|
||||
HTable table = new HTable(getConfiguration(), tableName);
|
||||
Scan scan = new Scan();
|
||||
ResultScanner resScan = table.getScanner(scan);
|
||||
for(Result res : resScan) {
|
||||
Delete del = new Delete(res.getRow());
|
||||
table.delete(del);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load table with rows from 'aaa' to 'zzz'.
|
||||
* @param t Table
|
||||
|
@ -575,4 +607,16 @@ public class HBaseTestingUtility {
|
|||
HRegionLocation hrl = table.getRegionLocation(row);
|
||||
closeRegion(hrl.getRegionInfo().getRegionName());
|
||||
}
|
||||
|
||||
public MiniZooKeeperCluster getZkCluster() {
|
||||
return zkCluster;
|
||||
}
|
||||
|
||||
public void setZkCluster(MiniZooKeeperCluster zkCluster) {
|
||||
this.zkCluster = zkCluster;
|
||||
}
|
||||
|
||||
public MiniDFSCluster getDFSCluster() {
|
||||
return dfsCluster;
|
||||
}
|
||||
}
|
|
@ -115,7 +115,7 @@ public class TestStoreReconstruction {
|
|||
byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
|
||||
edit.add(new KeyValue(rowName, family, qualifier,
|
||||
System.currentTimeMillis(), column));
|
||||
log.append(regionName, tableName, edit,
|
||||
log.append(info, tableName, edit,
|
||||
System.currentTimeMillis());
|
||||
edit.clear();
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ public class TestStoreReconstruction {
|
|||
// Add an edit to another family, should be skipped.
|
||||
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
|
||||
System.currentTimeMillis(), rowName));
|
||||
log.append(regionName, tableName, edit,
|
||||
log.append(info, tableName, edit,
|
||||
System.currentTimeMillis());
|
||||
log.sync();
|
||||
|
||||
|
|
|
@ -19,23 +19,23 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/** JUnit test case for HLog */
|
||||
public class TestHLog extends HBaseTestCase implements HConstants {
|
||||
private Path dir;
|
||||
|
@ -43,8 +43,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
// Enable append for these tests.
|
||||
this.conf.setBoolean("dfs.support.append", true);
|
||||
// Make block sizes small.
|
||||
this.conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||
this.conf.setInt("hbase.regionserver.flushlogentries", 1);
|
||||
|
@ -74,14 +72,21 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void testSplit() throws IOException {
|
||||
|
||||
final byte [] tableName = Bytes.toBytes(getName());
|
||||
final byte [] rowName = tableName;
|
||||
HLog log = new HLog(this.fs, this.dir, this.conf, null);
|
||||
final int howmany = 3;
|
||||
HRegionInfo[] infos = new HRegionInfo[3];
|
||||
for(int i = 0; i < howmany; i++) {
|
||||
infos[i] = new HRegionInfo(new HTableDescriptor(tableName),
|
||||
Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
|
||||
}
|
||||
// Add edits for three regions.
|
||||
try {
|
||||
for (int ii = 0; ii < howmany; ii++) {
|
||||
for (int i = 0; i < howmany; i++) {
|
||||
|
||||
for (int j = 0; j < howmany; j++) {
|
||||
List<KeyValue> edit = new ArrayList<KeyValue>();
|
||||
byte [] family = Bytes.toBytes("column");
|
||||
|
@ -90,10 +95,11 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
edit.add(new KeyValue(rowName, family, qualifier,
|
||||
System.currentTimeMillis(), column));
|
||||
System.out.println("Region " + i + ": " + edit);
|
||||
log.append(Bytes.toBytes("" + i), tableName, edit,
|
||||
log.append(infos[i], tableName, edit,
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
log.hflush();
|
||||
log.rollWriter();
|
||||
}
|
||||
List<Path> splits =
|
||||
|
@ -128,10 +134,14 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
Path subdir = new Path(this.dir, "hlogdir");
|
||||
HLog wal = new HLog(this.fs, subdir, this.conf, null);
|
||||
final int total = 20;
|
||||
|
||||
HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes),
|
||||
null,null, false);
|
||||
|
||||
for (int i = 0; i < total; i++) {
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
|
||||
wal.append(bytes, bytes, kvs, System.currentTimeMillis());
|
||||
wal.append(info, bytes, kvs, System.currentTimeMillis());
|
||||
}
|
||||
// Now call sync and try reading. Opening a Reader before you sync just
|
||||
// gives you EOFE.
|
||||
|
@ -149,7 +159,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
for (int i = 0; i < total; i++) {
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
|
||||
wal.append(bytes, bytes, kvs, System.currentTimeMillis());
|
||||
wal.append(info, bytes, kvs, System.currentTimeMillis());
|
||||
}
|
||||
reader = HLog.getReader(fs, walPath, conf);
|
||||
count = 0;
|
||||
|
@ -168,7 +178,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
for (int i = 0; i < total; i++) {
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value));
|
||||
wal.append(bytes, bytes, kvs, System.currentTimeMillis());
|
||||
wal.append(info, bytes, kvs, System.currentTimeMillis());
|
||||
}
|
||||
// Now I should have written out lots of blocks. Sync then read.
|
||||
wal.sync();
|
||||
|
@ -248,7 +258,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
*/
|
||||
public void testEditAdd() throws IOException {
|
||||
final int COL_COUNT = 10;
|
||||
final byte [] regionName = Bytes.toBytes("regionname");
|
||||
final byte [] tableName = Bytes.toBytes("tablename");
|
||||
final byte [] row = Bytes.toBytes("row");
|
||||
HLog.Reader reader = null;
|
||||
|
@ -263,7 +272,10 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
Bytes.toBytes(Integer.toString(i)),
|
||||
timestamp, new byte[] { (byte)(i + '0') }));
|
||||
}
|
||||
log.append(regionName, tableName, cols, System.currentTimeMillis());
|
||||
HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
|
||||
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
|
||||
final byte [] regionName = info.getRegionName();
|
||||
log.append(info, tableName, cols, System.currentTimeMillis());
|
||||
long logSeqId = log.startCacheFlush();
|
||||
log.completeCacheFlush(regionName, tableName, logSeqId);
|
||||
log.close();
|
||||
|
|
Loading…
Reference in New Issue