HBASE-7158 Allow CopyTable to identify the source cluster (for replication scenarios)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1424193 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-12-19 22:46:07 +00:00
parent ef20e8a8da
commit cfafe13c1f
2 changed files with 28 additions and 6 deletions

View File

@ -194,6 +194,7 @@ public abstract class Mutation extends OperationWithAttributes implements Row {
* @param clusterId
*/
public void setClusterId(UUID clusterId) {
if (clusterId == null) return;
byte[] val = new byte[2*Bytes.SIZEOF_LONG];
Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());

View File

@ -21,25 +21,28 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.zookeeper.KeeperException;
/**
* Import data written by {@link Export}.
@ -50,6 +53,7 @@ public class Import {
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
private static final Log LOG = LogFactory.getLog(Import.class);
/**
* A mapper that just writes out KeyValues.
@ -91,6 +95,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
private UUID clusterId;
/**
* @param row The current table row key.
@ -131,16 +136,32 @@ public class Import {
}
}
if (put != null) {
put.setClusterId(clusterId);
context.write(key, put);
}
if (delete != null) {
delete.setClusterId(clusterId);
context.write(key, delete);
}
}
@Override
public void setup(Context context) {
cfRenameMap = createCfRenameMap(context.getConfiguration());
Configuration conf = context.getConfiguration();
cfRenameMap = createCfRenameMap(conf);
try {
HConnection connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
clusterId = zkHelper.getUUIDForCluster(zkw);
} catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) {
LOG.error("Problem reading ZooKeeper data during task setup", e);
} catch (IOException e) {
LOG.error("Problem setting up task", e);
}
}
}