HBASE-12820 Use table lock instead of MobZookeeper.(Jingcheng Du)
This commit is contained in:
parent
1b800f7d4c
commit
fbbb3249d9
|
@ -72,6 +72,7 @@ public class MobConstants {
|
|||
public static final long DEFAULT_MOB_CACHE_EVICT_PERIOD = 3600l;
|
||||
|
||||
public final static String TEMP_DIR_NAME = ".tmp";
|
||||
public final static byte[] MOB_TABLE_LOCK_SUFFIX = Bytes.toBytes(".mobLock");
|
||||
public final static String EMPTY_STRING = "";
|
||||
private MobConstants() {
|
||||
|
||||
|
|
|
@ -273,16 +273,6 @@ public class MobUtils {
|
|||
LOG.info(deletedFileCount + " expired mob files are deleted");
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the znode name of column family.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The name of the current column family.
|
||||
* @return The znode name of column family.
|
||||
*/
|
||||
public static String getColumnFamilyZNodeName(String tableName, String familyName) {
|
||||
return tableName + ":" + familyName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the root dir of the mob files.
|
||||
* It's {HBASE_DIR}/mobdir.
|
||||
|
@ -548,4 +538,15 @@ public class MobUtils {
|
|||
return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
|
||||
cell.getValueLength() - Bytes.SIZEOF_INT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the table name used in the table lock.
|
||||
* The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
|
||||
* @param tn The table name.
|
||||
* @return The table name used in table lock.
|
||||
*/
|
||||
public static TableName getTableLockName(TableName tn) {
|
||||
byte[] tableName = tn.getName();
|
||||
return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,270 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* The zookeeper used for MOB.
|
||||
* This zookeeper is used to synchronize the HBase major compaction and sweep tool.
|
||||
* The structure of the nodes for mob in zookeeper.
|
||||
* |--baseNode
|
||||
* |--MOB
|
||||
* |--tableName:columnFamilyName-lock // locks for the mob column family
|
||||
* |--tableName:columnFamilyName-sweeper // when a sweep tool runs, such a node is added
|
||||
* |--tableName:columnFamilyName-majorCompaction
|
||||
* |--UUID //when a major compaction occurs, such a node is added.
|
||||
* In order to synchronize the operations between the sweep tool and HBase major compaction, these
|
||||
* actions need to acquire the tableName:columnFamilyName-lock before the sweep tool and major
|
||||
* compaction run.
|
||||
* In sweep tool.
|
||||
* 1. If it acquires the lock successfully. It check whether the sweeper node exists, if exist the
|
||||
* current running is aborted. If not it it checks whether there're major compaction nodes, if yes
|
||||
* the current running is aborted, if not it adds a sweep node to the zookeeper.
|
||||
* 2. If it could not obtain the lock, the current running is aborted.
|
||||
* In the HBase compaction.
|
||||
* 1. If it's a minor compaction, continue the compaction.
|
||||
* 2. If it's a major compaction, it acquires a lock in zookeeper.
|
||||
* A. If it obtains the lock, it checks whether there's sweep node, if yes it converts itself
|
||||
* to a minor one and continue, if no it adds a major compaction node to the zookeeper.
|
||||
* B. If it could not obtain the lock, it converts itself to a minor one and continue the
|
||||
* compaction.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MobZookeeper {
|
||||
// TODO Will remove this class before the mob is merged back to master.
|
||||
private static final Log LOG = LogFactory.getLog(MobZookeeper.class);
|
||||
|
||||
private ZooKeeperWatcher zkw;
|
||||
private String mobZnode;
|
||||
private static final String LOCK_EPHEMERAL = "-lock";
|
||||
private static final String SWEEPER_EPHEMERAL = "-sweeper";
|
||||
private static final String MAJOR_COMPACTION_EPHEMERAL = "-majorCompaction";
|
||||
|
||||
private MobZookeeper(Configuration conf, String identifier) throws IOException,
|
||||
KeeperException {
|
||||
this.zkw = new ZooKeeperWatcher(conf, identifier, new DummyMobAbortable());
|
||||
mobZnode = ZKUtil.joinZNode(zkw.baseZNode, "MOB");
|
||||
if (ZKUtil.checkExists(zkw, mobZnode) == -1) {
|
||||
ZKUtil.createWithParents(zkw, mobZnode);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an new instance of MobZookeeper.
|
||||
* @param conf The current configuration.
|
||||
* @param identifier string that is passed to RecoverableZookeeper to be used as
|
||||
* identifier for this instance.
|
||||
* @return A new instance of MobZookeeper.
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static MobZookeeper newInstance(Configuration conf, String identifier) throws IOException,
|
||||
KeeperException {
|
||||
return new MobZookeeper(conf, identifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire a lock on the current column family.
|
||||
* All the threads try to access the column family acquire a lock which is actually create an
|
||||
* ephemeral node in the zookeeper.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
* @return True if the lock is obtained successfully. Otherwise false is returned.
|
||||
*/
|
||||
public boolean lockColumnFamily(String tableName, String familyName) {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
boolean locked = false;
|
||||
try {
|
||||
locked = ZKUtil.createEphemeralNodeAndWatch(zkw,
|
||||
ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL), null);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(locked ? "Locked the column family " + znodeName
|
||||
: "Can not lock the column family " + znodeName);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Fail to lock the column family " + znodeName, e);
|
||||
}
|
||||
return locked;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the lock on the current column family.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
*/
|
||||
public void unlockColumnFamily(String tableName, String familyName) {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unlocking the column family " + znodeName);
|
||||
}
|
||||
try {
|
||||
ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + LOCK_EPHEMERAL));
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Fail to unlock the column family " + znodeName, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a node to zookeeper which indicates that a sweep tool is running.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current columnFamilyName name.
|
||||
* @param data the data of the ephemeral node.
|
||||
* @return True if the node is created successfully. Otherwise false is returned.
|
||||
*/
|
||||
public boolean addSweeperZNode(String tableName, String familyName, byte[] data) {
|
||||
boolean add = false;
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
try {
|
||||
add = ZKUtil.createEphemeralNodeAndWatch(zkw,
|
||||
ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL), data);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(add ? "Added a znode for sweeper " + znodeName
|
||||
: "Cannot add a znode for sweeper " + znodeName);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Fail to add a znode for sweeper " + znodeName, e);
|
||||
}
|
||||
return add;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the path of the sweeper znode in zookeeper.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current columnFamilyName name.
|
||||
* @return The path of the sweeper znode in zookeper.
|
||||
*/
|
||||
public String getSweeperZNodePath(String tableName, String familyName) {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
return ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the node from zookeeper which indicates that a sweep tool is finished.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
*/
|
||||
public void deleteSweeperZNode(String tableName, String familyName) {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
try {
|
||||
ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL));
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Fail to delete a znode for sweeper " + znodeName, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the znode exists in the Zookeeper.
|
||||
* If the node exists, it means a sweep tool is running.
|
||||
* Otherwise, the sweep tool is not.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
* @return True if this node doesn't exist. Otherwise false is returned.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public boolean isSweeperZNodeExist(String tableName, String familyName) throws KeeperException {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
return ZKUtil.checkExists(zkw, ZKUtil.joinZNode(mobZnode, znodeName + SWEEPER_EPHEMERAL)) >= 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether there're major compactions nodes in the zookeeper.
|
||||
* If there're such nodes, it means there're major compactions in progress now.
|
||||
* Otherwise there're not.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
* @return True if there're major compactions in progress. Otherwise false is returned.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public boolean hasMajorCompactionChildren(String tableName, String familyName)
|
||||
throws KeeperException {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
|
||||
List<String> children = ZKUtil.listChildrenNoWatch(zkw, mcPath);
|
||||
return children != null && !children.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a node of a major compaction to the Zookeeper.
|
||||
* Before a HBase major compaction, such a node is created to the Zookeeper. It tells others that
|
||||
* there're major compaction in progress, the sweep tool could not be run at this time.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
* @param compactionName The current compaction name.
|
||||
* @return True if the node is created successfully. Otherwise false is returned.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public boolean addMajorCompactionZNode(String tableName, String familyName,
|
||||
String compactionName) throws KeeperException {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
|
||||
ZKUtil.createNodeIfNotExistsAndWatch(zkw, mcPath, null);
|
||||
String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
|
||||
return ZKUtil.createEphemeralNodeAndWatch(zkw, eachMcPath, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a major compaction node from the Zookeeper.
|
||||
* @param tableName The current table name.
|
||||
* @param familyName The current column family name.
|
||||
* @param compactionName The current compaction name.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public void deleteMajorCompactionZNode(String tableName, String familyName,
|
||||
String compactionName) throws KeeperException {
|
||||
String znodeName = MobUtils.getColumnFamilyZNodeName(tableName, familyName);
|
||||
String mcPath = ZKUtil.joinZNode(mobZnode, znodeName + MAJOR_COMPACTION_EPHEMERAL);
|
||||
String eachMcPath = ZKUtil.joinZNode(mcPath, compactionName);
|
||||
ZKUtil.deleteNode(zkw, eachMcPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the MobZookeeper.
|
||||
*/
|
||||
public void close() {
|
||||
this.zkw.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* An dummy abortable. It's used for the MobZookeeper.
|
||||
*/
|
||||
public static class DummyMobAbortable implements Abortable {
|
||||
|
||||
private boolean abort = false;
|
||||
|
||||
public void abort(String why, Throwable e) {
|
||||
abort = true;
|
||||
}
|
||||
|
||||
public boolean isAborted() {
|
||||
return abort;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob.mapreduce;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.PriorityQueue;
|
||||
|
@ -37,32 +38,39 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.MobZookeeper;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Strings;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.serializer.JavaSerialization;
|
||||
import org.apache.hadoop.io.serializer.WritableSerialization;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -77,21 +85,23 @@ public class SweepJob {
|
|||
private final FileSystem fs;
|
||||
private final Configuration conf;
|
||||
private static final Log LOG = LogFactory.getLog(SweepJob.class);
|
||||
static final String SWEEP_JOB_ID = "mob.compaction.id";
|
||||
static final String SWEEPER_NODE = "mob.compaction.sweep.node";
|
||||
static final String WORKING_DIR_KEY = "mob.compaction.dir";
|
||||
static final String WORKING_ALLNAMES_FILE_KEY = "mob.compaction.all.file";
|
||||
static final String WORKING_VISITED_DIR_KEY = "mob.compaction.visited.dir";
|
||||
static final String SWEEP_JOB_ID = "mob.sweep.job.id";
|
||||
static final String SWEEP_JOB_SERVERNAME = "mob.sweep.job.servername";
|
||||
static final String SWEEP_JOB_TABLE_NODE = "mob.sweep.job.table.node";
|
||||
static final String WORKING_DIR_KEY = "mob.sweep.job.dir";
|
||||
static final String WORKING_ALLNAMES_FILE_KEY = "mob.sweep.job.all.file";
|
||||
static final String WORKING_VISITED_DIR_KEY = "mob.sweep.job.visited.dir";
|
||||
static final String WORKING_ALLNAMES_DIR = "all";
|
||||
static final String WORKING_VISITED_DIR = "visited";
|
||||
public static final String WORKING_FILES_DIR_KEY = "mob.compaction.files.dir";
|
||||
//the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
|
||||
public static final String MOB_COMPACTION_DELAY = "hbase.mob.compaction.delay";
|
||||
public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
|
||||
//the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
|
||||
public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
|
||||
protected static long ONE_DAY = 24 * 60 * 60 * 1000;
|
||||
private long compactionStartTime = EnvironmentEdgeManager.currentTime();
|
||||
public final static String CREDENTIALS_LOCATION = "credentials_location";
|
||||
private CacheConfig cacheConfig;
|
||||
static final int SCAN_CACHING = 10000;
|
||||
private TableLockManager tableLockManager;
|
||||
|
||||
public SweepJob(Configuration conf, FileSystem fs) {
|
||||
this.conf = conf;
|
||||
|
@ -102,6 +112,22 @@ public class SweepJob {
|
|||
cacheConfig = new CacheConfig(copyOfConf);
|
||||
}
|
||||
|
||||
static ServerName getCurrentServerName(Configuration conf) throws IOException {
|
||||
String hostname = conf.get(
|
||||
"hbase.regionserver.ipc.address",
|
||||
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||
conf.get("hbase.regionserver.dns.interface", "default"),
|
||||
conf.get("hbase.regionserver.dns.nameserver", "default"))));
|
||||
int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
|
||||
// Creation of a HSA will force a resolve.
|
||||
InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
|
||||
if (initialIsa.getAddress() == null) {
|
||||
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
|
||||
}
|
||||
return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(),
|
||||
EnvironmentEdgeManager.currentTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs MapReduce to do the sweeping on the mob files.
|
||||
* There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
|
||||
|
@ -141,37 +167,21 @@ public class SweepJob {
|
|||
}
|
||||
String familyName = family.getNameAsString();
|
||||
String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
|
||||
MobZookeeper zk = MobZookeeper.newInstance(conf, id);
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable());
|
||||
try {
|
||||
// Try to obtain the lock. Use this lock to synchronize all the query, creation/deletion
|
||||
// in the Zookeeper.
|
||||
if (!zk.lockColumnFamily(tn.getNameAsString(), familyName)) {
|
||||
LOG.warn("Can not lock the store " + familyName
|
||||
+ ". The major compaction in HBase may be in-progress. Please re-run the job.");
|
||||
return 3;
|
||||
}
|
||||
ServerName serverName = getCurrentServerName(conf);
|
||||
tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName);
|
||||
TableName lockName = MobUtils.getTableLockName(tn);
|
||||
TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
|
||||
String tableName = tn.getNameAsString();
|
||||
// Try to obtain the lock. Use this lock to synchronize all the query
|
||||
try {
|
||||
// Checks whether there're HBase major compaction now.
|
||||
boolean hasChildren = zk.hasMajorCompactionChildren(tn.getNameAsString(), familyName);
|
||||
if (hasChildren) {
|
||||
LOG.warn("The major compaction in HBase may be in-progress."
|
||||
+ " Please re-run the job.");
|
||||
return 4;
|
||||
} else {
|
||||
// Checks whether there's sweep tool in progress.
|
||||
boolean hasSweeper = zk.isSweeperZNodeExist(tn.getNameAsString(), familyName);
|
||||
if (hasSweeper) {
|
||||
LOG.warn("Another sweep job is running");
|
||||
return 5;
|
||||
} else {
|
||||
// add the sweeper node, mark that there's one sweep tool in progress.
|
||||
// All the HBase major compaction and sweep tool in this column family could not
|
||||
// run until this sweep tool is finished.
|
||||
zk.addSweeperZNode(tn.getNameAsString(), familyName, Bytes.toBytes(id));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
zk.unlockColumnFamily(tn.getNameAsString(), familyName);
|
||||
lock.acquire();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Can not lock the table " + tableName
|
||||
+ ". The major compaction in HBase may be in-progress or another sweep job is running."
|
||||
+ " Please re-run the job.");
|
||||
return 3;
|
||||
}
|
||||
Job job = null;
|
||||
try {
|
||||
|
@ -186,7 +196,9 @@ public class SweepJob {
|
|||
conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
|
||||
JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
|
||||
conf.set(SWEEP_JOB_ID, id);
|
||||
conf.set(SWEEPER_NODE, zk.getSweeperZNodePath(tn.getNameAsString(), familyName));
|
||||
conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
|
||||
String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
|
||||
conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
|
||||
job = prepareJob(tn, familyName, scan, conf);
|
||||
job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
|
||||
// Record the compaction start time.
|
||||
|
@ -204,14 +216,21 @@ public class SweepJob {
|
|||
removeUnusedFiles(job, tn, family);
|
||||
} else {
|
||||
System.err.println("Job Failed");
|
||||
return 6;
|
||||
return 4;
|
||||
}
|
||||
} finally {
|
||||
cleanup(job, tn, familyName);
|
||||
zk.deleteSweeperZNode(tn.getNameAsString(), familyName);
|
||||
try {
|
||||
cleanup(job, tn, familyName);
|
||||
} finally {
|
||||
try {
|
||||
lock.release();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to release the table lock " + tableName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
zk.close();
|
||||
zkw.close();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -305,7 +324,7 @@ public class SweepJob {
|
|||
// archive them.
|
||||
FileStatus[] files = fs.listStatus(mobStorePath);
|
||||
Set<String> fileNames = new TreeSet<String>();
|
||||
long mobCompactionDelay = job.getConfiguration().getLong(MOB_COMPACTION_DELAY, ONE_DAY);
|
||||
long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY);
|
||||
for (FileStatus fileStatus : files) {
|
||||
if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
|
||||
if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
|
||||
|
@ -422,9 +441,8 @@ public class SweepJob {
|
|||
* Deletes the working directory.
|
||||
* @param job The current job.
|
||||
* @param familyName The family to cleanup
|
||||
* @throws IOException
|
||||
*/
|
||||
private void cleanup(Job job, TableName tn, String familyName) throws IOException {
|
||||
private void cleanup(Job job, TableName tn, String familyName) {
|
||||
if (job != null) {
|
||||
// delete the working directory
|
||||
Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
|
||||
|
@ -563,4 +581,18 @@ public class SweepJob {
|
|||
*/
|
||||
RECORDS_UPDATED,
|
||||
}
|
||||
|
||||
public static class DummyMobAbortable implements Abortable {
|
||||
|
||||
private boolean abort = false;
|
||||
|
||||
public void abort(String why, Throwable e) {
|
||||
abort = true;
|
||||
}
|
||||
|
||||
public boolean isAborted() {
|
||||
return abort;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -18,8 +18,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.mob.mapreduce;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -36,38 +42,58 @@ import org.apache.zookeeper.KeeperException;
|
|||
@InterfaceAudience.Private
|
||||
public class SweepJobNodeTracker extends ZooKeeperListener {
|
||||
|
||||
private String node;
|
||||
private String sweepJobId;
|
||||
private String parentNode;
|
||||
private String lockNodePrefix;
|
||||
private String owner;
|
||||
private String lockNode;
|
||||
|
||||
public SweepJobNodeTracker(ZooKeeperWatcher watcher, String node, String sweepJobId) {
|
||||
public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) {
|
||||
super(watcher);
|
||||
this.node = node;
|
||||
this.sweepJobId = sweepJobId;
|
||||
this.parentNode = parentNode;
|
||||
this.owner = owner;
|
||||
this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-");
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the watcher on the sweep job node.
|
||||
* If there's no such a sweep job node, or it's not created by the sweep job that
|
||||
* owns the current MR, the current process will be aborted.
|
||||
* This assumes the table lock uses the Zookeeper. It's a workaround and only used
|
||||
* in the sweep tool, and the sweep tool will be removed after the mob file compaction
|
||||
* is finished.
|
||||
*/
|
||||
public void start() throws KeeperException {
|
||||
watcher.registerListener(this);
|
||||
if (ZKUtil.watchAndCheckExists(watcher, node)) {
|
||||
byte[] data = ZKUtil.getDataAndWatch(watcher, node);
|
||||
if (data != null) {
|
||||
if (!sweepJobId.equals(Bytes.toString(data))) {
|
||||
System.exit(1);
|
||||
List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode);
|
||||
if (children != null && !children.isEmpty()) {
|
||||
// there are locks
|
||||
TreeSet<String> sortedChildren = new TreeSet<String>();
|
||||
sortedChildren.addAll(children);
|
||||
// find all the write locks
|
||||
SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix);
|
||||
if (!tails.isEmpty()) {
|
||||
for (String tail : tails) {
|
||||
String path = ZKUtil.joinZNode(parentNode, tail);
|
||||
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
|
||||
TableLock lock = TableLockManager.fromBytes(data);
|
||||
ServerName serverName = lock.getLockOwner();
|
||||
org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf(
|
||||
serverName.getHostName(), serverName.getPort(), serverName.getStartCode());
|
||||
// compare the server names (host, port and start code), make sure the lock is created
|
||||
if (owner.equals(sn.toString())) {
|
||||
lockNode = path;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
System.exit(1);
|
||||
}
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDeleted(String path) {
|
||||
// If the ephemeral node is deleted, abort the current process.
|
||||
if (node.equals(path)) {
|
||||
// If the lock node is deleted, abort the current process.
|
||||
if (path.equals(lockNode)) {
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -45,11 +45,12 @@ public class SweepMapper extends TableMapper<Text, KeyValue> {
|
|||
protected void setup(Context context) throws IOException,
|
||||
InterruptedException {
|
||||
String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
|
||||
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
|
||||
String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
|
||||
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
|
||||
zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
|
||||
new DummyMobAbortable());
|
||||
try {
|
||||
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, id);
|
||||
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
|
||||
tracker.start();
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException(e);
|
||||
|
|
|
@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
|
|||
import org.apache.hadoop.hbase.mob.MobFile;
|
||||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.MobZookeeper.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
|
||||
|
@ -102,8 +102,8 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
protected void setup(Context context) throws IOException, InterruptedException {
|
||||
this.conf = context.getConfiguration();
|
||||
this.fs = FileSystem.get(conf);
|
||||
// the MOB_COMPACTION_DELAY is ONE_DAY by default. Its value is only changed when testing.
|
||||
mobCompactionDelay = conf.getLong(SweepJob.MOB_COMPACTION_DELAY, SweepJob.ONE_DAY);
|
||||
// the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
|
||||
mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
|
||||
String tableName = conf.get(TableInputFormat.INPUT_TABLE);
|
||||
String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
|
||||
TableName tn = TableName.valueOf(tableName);
|
||||
|
@ -125,7 +125,7 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
}
|
||||
// disable the block cache.
|
||||
Configuration copyOfConf = new Configuration(conf);
|
||||
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.00001f);
|
||||
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||
this.cacheConfig = new CacheConfig(copyOfConf);
|
||||
|
||||
table = new HTable(this.conf, Bytes.toBytes(tableName));
|
||||
|
@ -148,12 +148,13 @@ public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
|
|||
@Override
|
||||
public void run(Context context) throws IOException, InterruptedException {
|
||||
String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
|
||||
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEPER_NODE);
|
||||
String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
|
||||
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
|
||||
new DummyMobAbortable());
|
||||
FSDataOutputStream fout = null;
|
||||
try {
|
||||
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, jobId);
|
||||
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
|
||||
tracker.start();
|
||||
setup(context);
|
||||
// create a sequence contains all the visited file names in this reducer.
|
||||
|
|
|
@ -46,17 +46,17 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.mob.MobCacheConfig;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobFile;
|
||||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobStoreEngine;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.MobZookeeper;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* The store implementation to save MOBs (medium objects), it extends the HStore.
|
||||
|
@ -91,6 +91,8 @@ public class HMobStore extends HStore {
|
|||
private volatile long mobScanCellsSize = 0;
|
||||
private List<Path> mobDirLocations;
|
||||
private HColumnDescriptor family;
|
||||
private TableLockManager tableLockManager;
|
||||
private TableName tableLockName;
|
||||
|
||||
public HMobStore(final HRegion region, final HColumnDescriptor family,
|
||||
final Configuration confParam) throws IOException {
|
||||
|
@ -105,6 +107,10 @@ public class HMobStore extends HStore {
|
|||
TableName tn = region.getTableDesc().getTableName();
|
||||
mobDirLocations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
|
||||
.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
|
||||
if (region.getRegionServerServices() != null) {
|
||||
tableLockManager = region.getRegionServerServices().getTableLockManager();
|
||||
tableLockName = MobUtils.getTableLockName(getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -425,59 +431,40 @@ public class HMobStore extends HStore {
|
|||
// compaction as retainDeleteMarkers and continue the compaction.
|
||||
// 1.2.2. If the node is not there, add a child to the major compaction node, and
|
||||
// run the compaction directly.
|
||||
String compactionName = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
MobZookeeper zk = null;
|
||||
try {
|
||||
zk = MobZookeeper.newInstance(region.getBaseConf(), compactionName);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Cannot connect to the zookeeper, forcing the delete markers to be retained", e);
|
||||
compaction.getRequest().forceRetainDeleteMarkers();
|
||||
return super.compact(compaction);
|
||||
TableLock lock = null;
|
||||
if (tableLockManager != null) {
|
||||
lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
|
||||
}
|
||||
boolean keepDeleteMarkers = true;
|
||||
boolean majorCompactNodeAdded = false;
|
||||
try {
|
||||
// try to acquire the operation lock.
|
||||
if (zk.lockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString())) {
|
||||
try {
|
||||
LOG.info("Obtain the lock for the store[" + this
|
||||
+ "], ready to perform the major compaction");
|
||||
// check the sweeping node to find out whether the sweeping is in progress.
|
||||
boolean hasSweeper = zk.isSweeperZNodeExist(getTableName().getNameAsString(),
|
||||
getFamily().getNameAsString());
|
||||
if (!hasSweeper) {
|
||||
// if not, add a child to the major compaction node of this store.
|
||||
majorCompactNodeAdded = zk.addMajorCompactionZNode(getTableName().getNameAsString(),
|
||||
getFamily().getNameAsString(), compactionName);
|
||||
// If we failed to add the major compact node, go with keep delete markers mode.
|
||||
keepDeleteMarkers = !majorCompactNodeAdded;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to handle the Zookeeper", e);
|
||||
} finally {
|
||||
// release the operation lock
|
||||
zk.unlockColumnFamily(getTableName().getNameAsString(), getFamily().getNameAsString());
|
||||
}
|
||||
}
|
||||
boolean tableLocked = false;
|
||||
String tableName = getTableName().getNameAsString();
|
||||
if (lock != null) {
|
||||
try {
|
||||
if (keepDeleteMarkers) {
|
||||
LOG.warn("Cannot obtain the lock or a sweep tool is running on this store[" + this
|
||||
+ "], forcing the delete markers to be retained");
|
||||
compaction.getRequest().forceRetainDeleteMarkers();
|
||||
}
|
||||
return super.compact(compaction);
|
||||
} finally {
|
||||
if (majorCompactNodeAdded) {
|
||||
try {
|
||||
zk.deleteMajorCompactionZNode(getTableName().getNameAsString(), getFamily()
|
||||
.getNameAsString(), compactionName);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Fail to delete the compaction znode" + compactionName, e);
|
||||
}
|
||||
LOG.info("Start to acquire a read lock for the table[" + tableName
|
||||
+ "], ready to perform the major compaction");
|
||||
lock.acquire();
|
||||
tableLocked = true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to lock the table " + tableName, e);
|
||||
}
|
||||
} else {
|
||||
// If the tableLockManager is null, mark the tableLocked as true.
|
||||
tableLocked = true;
|
||||
}
|
||||
try {
|
||||
if (!tableLocked) {
|
||||
LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
|
||||
+ tableName + "], forcing the delete markers to be retained");
|
||||
compaction.getRequest().forceRetainDeleteMarkers();
|
||||
}
|
||||
return super.compact(compaction);
|
||||
} finally {
|
||||
if (tableLocked && lock != null) {
|
||||
try {
|
||||
lock.release();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Fail to release the table lock " + tableName, e);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
zk.close();
|
||||
}
|
||||
} else {
|
||||
// If it's not a major compaction, continue the compaction.
|
||||
|
|
|
@ -17,15 +17,27 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.mob.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mob.MobZookeeper;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -35,9 +47,6 @@ import org.junit.experimental.categories.Category;
|
|||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestMobSweepMapper {
|
||||
|
||||
|
@ -71,30 +80,41 @@ public class TestMobSweepMapper {
|
|||
when(columns.raw()).thenReturn(kvList);
|
||||
|
||||
Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
|
||||
TableName tn = TableName.valueOf("testSweepMapper");
|
||||
TableName lockName = MobUtils.getTableLockName(tn);
|
||||
String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
|
||||
configuration.set(SweepJob.SWEEP_JOB_ID, "1");
|
||||
configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepMapper:family-sweeper");
|
||||
configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
|
||||
ServerName serverName = SweepJob.getCurrentServerName(configuration);
|
||||
configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
|
||||
|
||||
MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
|
||||
zk.addSweeperZNode("testSweepMapper", "family", Bytes.toBytes("1"));
|
||||
TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
|
||||
serverName);
|
||||
TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
|
||||
lock.acquire();
|
||||
try {
|
||||
Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
|
||||
mock(Mapper.Context.class);
|
||||
when(ctx.getConfiguration()).thenReturn(configuration);
|
||||
SweepMapper map = new SweepMapper();
|
||||
doAnswer(new Answer<Void>() {
|
||||
|
||||
Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
|
||||
mock(Mapper.Context.class);
|
||||
when(ctx.getConfiguration()).thenReturn(configuration);
|
||||
SweepMapper map = new SweepMapper();
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Text text = (Text) invocation.getArguments()[0];
|
||||
KeyValue kv = (KeyValue) invocation.getArguments()[1];
|
||||
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
Text text = (Text) invocation.getArguments()[0];
|
||||
KeyValue kv = (KeyValue) invocation.getArguments()[1];
|
||||
assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
|
||||
assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
|
||||
|
||||
assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
|
||||
assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
|
||||
return null;
|
||||
}
|
||||
}).when(ctx).write(any(Text.class), any(KeyValue.class));
|
||||
|
||||
return null;
|
||||
}
|
||||
}).when(ctx).write(any(Text.class), any(KeyValue.class));
|
||||
|
||||
map.map(r, columns, ctx);
|
||||
map.map(r, columns, ctx);
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,16 +36,21 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.mob.MobZookeeper;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
|
||||
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -126,11 +131,11 @@ public class TestMobSweepReducer {
|
|||
@Test
|
||||
public void testRun() throws Exception {
|
||||
|
||||
TableName tn = TableName.valueOf(tableName);
|
||||
byte[] mobValueBytes = new byte[100];
|
||||
|
||||
//get the path where mob files lie in
|
||||
Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(),
|
||||
TableName.valueOf(tableName), family);
|
||||
Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
|
||||
|
||||
Put put = new Put(Bytes.toBytes(row));
|
||||
put.add(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
|
||||
|
@ -139,7 +144,7 @@ public class TestMobSweepReducer {
|
|||
table.put(put);
|
||||
table.put(put2);
|
||||
table.flushCommits();
|
||||
admin.flush(TableName.valueOf(tableName));
|
||||
admin.flush(tn);
|
||||
|
||||
FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
|
||||
//check the generation of a mob file
|
||||
|
@ -159,34 +164,42 @@ public class TestMobSweepReducer {
|
|||
configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
|
||||
System.currentTimeMillis() + 24 * 3600 * 1000);
|
||||
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
|
||||
TableName lockName = MobUtils.getTableLockName(tn);
|
||||
String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
|
||||
configuration.set(SweepJob.SWEEP_JOB_ID, "1");
|
||||
configuration.set(SweepJob.SWEEPER_NODE, "/hbase/MOB/testSweepReducer:family-sweeper");
|
||||
configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
|
||||
ServerName serverName = SweepJob.getCurrentServerName(configuration);
|
||||
configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
|
||||
|
||||
MobZookeeper zk = MobZookeeper.newInstance(configuration, "1");
|
||||
zk.addSweeperZNode(tableName, family, Bytes.toBytes("1"));
|
||||
TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
|
||||
serverName);
|
||||
TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
|
||||
lock.acquire();
|
||||
try {
|
||||
// use the same counter when mocking
|
||||
Counter counter = new GenericCounter();
|
||||
Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
|
||||
when(ctx.getConfiguration()).thenReturn(configuration);
|
||||
when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
|
||||
when(ctx.nextKey()).thenReturn(true).thenReturn(false);
|
||||
when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
|
||||
|
||||
//use the same counter when mocking
|
||||
Counter counter = new GenericCounter();
|
||||
Reducer<Text, KeyValue, Writable, Writable>.Context ctx =
|
||||
mock(Reducer.Context.class);
|
||||
when(ctx.getConfiguration()).thenReturn(configuration);
|
||||
when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
|
||||
when(ctx.nextKey()).thenReturn(true).thenReturn(false);
|
||||
when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
|
||||
byte[] refBytes = Bytes.toBytes(mobFile1);
|
||||
long valueLength = refBytes.length;
|
||||
byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
|
||||
KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
|
||||
KeyValue.Type.Put, newValue);
|
||||
List<KeyValue> list = new ArrayList<KeyValue>();
|
||||
list.add(kv2);
|
||||
|
||||
byte[] refBytes = Bytes.toBytes(mobFile1);
|
||||
long valueLength = refBytes.length;
|
||||
byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
|
||||
KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
|
||||
Bytes.toBytes(qf), 1, KeyValue.Type.Put, newValue);
|
||||
List<KeyValue> list = new ArrayList<KeyValue>();
|
||||
list.add(kv2);
|
||||
|
||||
when(ctx.getValues()).thenReturn(list);
|
||||
|
||||
SweepReducer reducer = new SweepReducer();
|
||||
reducer.run(ctx);
|
||||
when(ctx.getValues()).thenReturn(list);
|
||||
|
||||
SweepReducer reducer = new SweepReducer();
|
||||
reducer.run(ctx);
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
|
||||
String mobFile2 = filsStatuses2[0].getPath().getName();
|
||||
//new mob file is generated, old one has been archived
|
||||
|
@ -194,7 +207,7 @@ public class TestMobSweepReducer {
|
|||
assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
|
||||
|
||||
//test sequence file
|
||||
String workingPath = configuration.get("mob.compaction.visited.dir");
|
||||
String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
|
||||
FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
|
||||
Set<String> files = new TreeSet<String>();
|
||||
for (FileStatus st : statuses) {
|
||||
|
|
|
@ -180,7 +180,7 @@ public class TestMobSweeper {
|
|||
|
||||
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 24 * 60 * 60 * 1000);
|
||||
conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
|
||||
|
||||
String[] args = new String[2];
|
||||
args[0] = tableName;
|
||||
|
@ -260,7 +260,7 @@ public class TestMobSweeper {
|
|||
|
||||
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setLong(SweepJob.MOB_COMPACTION_DELAY, 0);
|
||||
conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
|
||||
|
||||
String[] args = new String[2];
|
||||
args[0] = tableName;
|
||||
|
|
|
@ -224,62 +224,6 @@ public class TestMobCompaction {
|
|||
countMobCellsInMetadata());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the major compaction when the zk is not connected.
|
||||
* After that the major compaction will be marked as retainDeleteMarkers, the delete marks
|
||||
* will be retained.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMajorCompactionWithZKError() throws Exception {
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
// use the wrong zk settings
|
||||
conf.setInt("zookeeper.recovery.retry", 0);
|
||||
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 100);
|
||||
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
|
||||
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181) - 1);
|
||||
init(conf, 200);
|
||||
byte[] dummyData = makeDummyData(300); // larger than mob threshold
|
||||
HRegionIncommon loader = new HRegionIncommon(region);
|
||||
byte[] deleteRow = Bytes.toBytes(0);
|
||||
for (int i = 0; i < compactionThreshold - 1 ; i++) {
|
||||
Put p = new Put(Bytes.toBytes(i));
|
||||
p.setDurability(Durability.SKIP_WAL);
|
||||
p.add(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
|
||||
loader.put(p);
|
||||
loader.flushcache();
|
||||
}
|
||||
Delete delete = new Delete(deleteRow);
|
||||
delete.deleteFamily(COLUMN_FAMILY);
|
||||
region.delete(delete);
|
||||
loader.flushcache();
|
||||
|
||||
assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
|
||||
region.compactStores(true);
|
||||
assertEquals("After compaction: store files", 1, countStoreFiles());
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.setRaw(true);
|
||||
InternalScanner scanner = region.getScanner(scan);
|
||||
List<Cell> results = new ArrayList<Cell>();
|
||||
scanner.next(results);
|
||||
int deleteCount = 0;
|
||||
while (!results.isEmpty()) {
|
||||
for (Cell c : results) {
|
||||
if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
|
||||
deleteCount++;
|
||||
assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
|
||||
}
|
||||
}
|
||||
results.clear();
|
||||
scanner.next(results);
|
||||
}
|
||||
// assert the delete mark is retained, the major compaction is marked as
|
||||
// retainDeleteMarkers.
|
||||
assertEquals(1, deleteCount);
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMajorCompactionAfterDelete() throws Exception {
|
||||
init(UTIL.getConfiguration(), 100);
|
||||
|
|
Loading…
Reference in New Issue