HBASE-4411 When copying tables/CFs, allow CF names to be changed
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1171880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d70a6ec716
commit
6a7e3c3b8b
|
@ -502,6 +502,8 @@ Release 0.91.0 - Unreleased
|
||||||
HBASE-4381 Refactor split decisions into a split policy class. (todd)
|
HBASE-4381 Refactor split decisions into a split policy class. (todd)
|
||||||
HBASE-4373 HBaseAdmin.assign() does not use force flag (Ramkrishna)
|
HBASE-4373 HBaseAdmin.assign() does not use force flag (Ramkrishna)
|
||||||
HBASE-4425 Provide access to RpcServer instance from RegionServerServices
|
HBASE-4425 Provide access to RpcServer instance from RegionServerServices
|
||||||
|
HBASE-4411 When copying tables/CFs, allow CF names to be changed
|
||||||
|
(David Revell)
|
||||||
|
|
||||||
TASKS
|
TASKS
|
||||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tool used to copy a table to another one which can be on a different setup.
|
* Tool used to copy a table to another one which can be on a different setup.
|
||||||
|
@ -67,9 +69,22 @@ public class CopyTable {
|
||||||
}
|
}
|
||||||
if(families != null) {
|
if(families != null) {
|
||||||
String[] fams = families.split(",");
|
String[] fams = families.split(",");
|
||||||
|
Map<String,String> cfRenameMap = new HashMap<String,String>();
|
||||||
for(String fam : fams) {
|
for(String fam : fams) {
|
||||||
scan.addFamily(Bytes.toBytes(fam));
|
String sourceCf;
|
||||||
|
if(fam.contains(":")) {
|
||||||
|
// fam looks like "sourceCfName:destCfName"
|
||||||
|
String[] srcAndDest = fam.split(":", 2);
|
||||||
|
sourceCf = srcAndDest[0];
|
||||||
|
String destCf = srcAndDest[1];
|
||||||
|
cfRenameMap.put(sourceCf, destCf);
|
||||||
|
} else {
|
||||||
|
// fam is just "sourceCf"
|
||||||
|
sourceCf = fam;
|
||||||
|
}
|
||||||
|
scan.addFamily(Bytes.toBytes(sourceCf));
|
||||||
}
|
}
|
||||||
|
Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
|
||||||
}
|
}
|
||||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||||
Import.Importer.class, null, null, job);
|
Import.Importer.class, null, null, job);
|
||||||
|
@ -101,7 +116,9 @@ public class CopyTable {
|
||||||
System.err.println(" new.name new table's name");
|
System.err.println(" new.name new table's name");
|
||||||
System.err.println(" peer.adr Address of the peer cluster given in the format");
|
System.err.println(" peer.adr Address of the peer cluster given in the format");
|
||||||
System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
|
System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent");
|
||||||
System.err.println(" families comma-seperated list of families to copy");
|
System.err.println(" families comma-separated list of families to copy");
|
||||||
|
System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
|
||||||
|
System.err.println(" To keep the same name, just give \"cfName\"");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
System.err.println("Args:");
|
System.err.println("Args:");
|
||||||
System.err.println(" tablename Name of the table to copy");
|
System.err.println(" tablename Name of the table to copy");
|
||||||
|
@ -111,7 +128,7 @@ public class CopyTable {
|
||||||
System.err.println(" $ bin/hbase " +
|
System.err.println(" $ bin/hbase " +
|
||||||
"org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " +
|
"org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " +
|
||||||
"--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " +
|
"--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " +
|
||||||
"--peer.adr=server1,server2,server3:2181:/hbase TestTable ");
|
"--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean doCommandLine(final String[] args) {
|
private static boolean doCommandLine(final String[] args) {
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -28,6 +30,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
|
||||||
|
@ -38,12 +41,15 @@ import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
*/
|
*/
|
||||||
public class Import {
|
public class Import {
|
||||||
final static String NAME = "import";
|
final static String NAME = "import";
|
||||||
|
public static final String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write table content out to files in hdfs.
|
* Write table content out to files in hdfs.
|
||||||
*/
|
*/
|
||||||
static class Importer
|
static class Importer
|
||||||
extends TableMapper<ImmutableBytesWritable, Put> {
|
extends TableMapper<ImmutableBytesWritable, Put> {
|
||||||
|
private Map<byte[], byte[]> cfRenameMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param row The current table row key.
|
* @param row The current table row key.
|
||||||
* @param value The columns.
|
* @param value The columns.
|
||||||
|
@ -63,16 +69,92 @@ public class Import {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Put resultToPut(ImmutableBytesWritable key, Result result)
|
private Put resultToPut(ImmutableBytesWritable key, Result result)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Put put = new Put(key.get());
|
Put put = new Put(key.get());
|
||||||
for (KeyValue kv : result.raw()) {
|
for (KeyValue kv : result.raw()) {
|
||||||
|
if(cfRenameMap != null) {
|
||||||
|
// If there's a rename mapping for this CF, create a new KeyValue
|
||||||
|
byte[] newCfName = cfRenameMap.get(kv.getFamily());
|
||||||
|
if(newCfName != null) {
|
||||||
|
kv = new KeyValue(kv.getBuffer(), // row buffer
|
||||||
|
kv.getRowOffset(), // row offset
|
||||||
|
kv.getRowLength(), // row length
|
||||||
|
newCfName, // CF buffer
|
||||||
|
0, // CF offset
|
||||||
|
newCfName.length, // CF length
|
||||||
|
kv.getBuffer(), // qualifier buffer
|
||||||
|
kv.getQualifierOffset(), // qualifier offset
|
||||||
|
kv.getQualifierLength(), // qualifier length
|
||||||
|
kv.getTimestamp(), // timestamp
|
||||||
|
KeyValue.Type.codeToType(kv.getType()), // KV Type
|
||||||
|
kv.getBuffer(), // value buffer
|
||||||
|
kv.getValueOffset(), // value offset
|
||||||
|
kv.getValueLength()); // value length
|
||||||
|
}
|
||||||
|
}
|
||||||
put.add(kv);
|
put.add(kv);
|
||||||
}
|
}
|
||||||
return put;
|
return put;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(Context context) {
|
||||||
|
// Make a map from sourceCfName to destCfName by parsing a config key
|
||||||
|
cfRenameMap = null;
|
||||||
|
String allMappingsPropVal = context.getConfiguration().get(CF_RENAME_PROP);
|
||||||
|
if(allMappingsPropVal != null) {
|
||||||
|
// The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
|
||||||
|
String[] allMappings = allMappingsPropVal.split(",");
|
||||||
|
for (String mapping: allMappings) {
|
||||||
|
if(cfRenameMap == null) {
|
||||||
|
cfRenameMap = new TreeMap<byte[],byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
}
|
||||||
|
String [] srcAndDest = mapping.split(":");
|
||||||
|
if(srcAndDest.length != 2) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
|
||||||
|
* the mapper how to rename column families.
|
||||||
|
*
|
||||||
|
* <p>Alternately, instead of calling this function, you could set the configuration key
|
||||||
|
* {@link #CF_RENAME_PROP} yourself. The value should look like
|
||||||
|
* <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
|
||||||
|
* the mapper behavior.
|
||||||
|
*
|
||||||
|
* @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
|
||||||
|
* set
|
||||||
|
* @param renameMap a mapping from source CF names to destination CF names
|
||||||
|
*/
|
||||||
|
static public void configureCfRenaming(Configuration conf,
|
||||||
|
Map<String, String> renameMap) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(Map.Entry<String,String> entry: renameMap.entrySet()) {
|
||||||
|
String sourceCf = entry.getKey();
|
||||||
|
String destCf = entry.getValue();
|
||||||
|
|
||||||
|
if(sourceCf.contains(":") || sourceCf.contains(",") ||
|
||||||
|
destCf.contains(":") || destCf.contains(",")) {
|
||||||
|
throw new IllegalArgumentException("Illegal character in CF names: "
|
||||||
|
+ sourceCf + ", " + destCf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(sb.length() != 0) {
|
||||||
|
sb.append(",");
|
||||||
|
}
|
||||||
|
sb.append(sourceCf + ":" + destCf);
|
||||||
|
}
|
||||||
|
conf.set(CF_RENAME_PROP, sb.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the actual job.
|
* Sets up the actual job.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue