From 6a7e3c3b8b0d13fc48d8d787cd01eb1e437475da Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 17 Sep 2011 00:38:21 +0000 Subject: [PATCH] HBASE-4411 When copying tables/CFs, allow CF names to be changed git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1171880 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hadoop/hbase/mapreduce/CopyTable.java | 23 ++++- .../apache/hadoop/hbase/mapreduce/Import.java | 84 ++++++++++++++++++- 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 982ccac3dcf..51b57a495a5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -502,6 +502,8 @@ Release 0.91.0 - Unreleased HBASE-4381 Refactor split decisions into a split policy class. (todd) HBASE-4373 HBaseAdmin.assign() does not use force flag (Ramkrishna) HBASE-4425 Provide access to RpcServer instance from RegionServerServices + HBASE-4411 When copying tables/CFs, allow CF names to be changed + (David Revell) TASKS HBASE-3559 Move report of split to master OFF the heartbeat channel diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 339651f4086..86066f77652 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * Tool used to copy a table to another one which can be on a different setup. @@ -67,9 +69,22 @@ public class CopyTable { } if(families != null) { String[] fams = families.split(","); + Map cfRenameMap = new HashMap(); for(String fam : fams) { - scan.addFamily(Bytes.toBytes(fam)); + String sourceCf; + if(fam.contains(":")) { + // fam looks like "sourceCfName:destCfName" + String[] srcAndDest = fam.split(":", 2); + sourceCf = srcAndDest[0]; + String destCf = srcAndDest[1]; + cfRenameMap.put(sourceCf, destCf); + } else { + // fam is just "sourceCf" + sourceCf = fam; + } + scan.addFamily(Bytes.toBytes(sourceCf)); } + Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); } TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.Importer.class, null, null, job); @@ -101,7 +116,9 @@ public class CopyTable { System.err.println(" new.name new table's name"); System.err.println(" peer.adr Address of the peer cluster given in the format"); System.err.println(" hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent"); - System.err.println(" families comma-seperated list of families to copy"); + System.err.println(" families comma-separated list of families to copy"); + System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); + System.err.println(" To keep the same name, just give \"cfName\""); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to copy"); @@ -111,7 +128,7 @@ public class CopyTable { System.err.println(" $ bin/hbase " + "org.apache.hadoop.hbase.mapreduce.CopyTable --rs.class=org.apache.hadoop.hbase.ipc.ReplicationRegionInterface " + "--rs.impl=org.apache.hadoop.hbase.regionserver.replication.ReplicationRegionServer --starttime=1265875194289 --endtime=1265878794289 " + - "--peer.adr=server1,server2,server3:2181:/hbase TestTable "); + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); } private static boolean doCommandLine(final String[] args) { diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 653de67f6d2..e6213911be4 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -28,6 +30,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -38,12 +41,15 @@ import org.apache.hadoop.util.GenericOptionsParser; */ public class Import { final static String NAME = "import"; + public static final String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; /** * Write table content out to files in hdfs. */ static class Importer extends TableMapper { + private Map cfRenameMap; + /** * @param row The current table row key. * @param value The columns. @@ -63,15 +69,91 @@ public class Import { } } - private static Put resultToPut(ImmutableBytesWritable key, Result result) + private Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { Put put = new Put(key.get()); for (KeyValue kv : result.raw()) { + if(cfRenameMap != null) { + // If there's a rename mapping for this CF, create a new KeyValue + byte[] newCfName = cfRenameMap.get(kv.getFamily()); + if(newCfName != null) { + kv = new KeyValue(kv.getBuffer(), // row buffer + kv.getRowOffset(), // row offset + kv.getRowLength(), // row length + newCfName, // CF buffer + 0, // CF offset + newCfName.length, // CF length + kv.getBuffer(), // qualifier buffer + kv.getQualifierOffset(), // qualifier offset + kv.getQualifierLength(), // qualifier length + kv.getTimestamp(), // timestamp + KeyValue.Type.codeToType(kv.getType()), // KV Type + kv.getBuffer(), // value buffer + kv.getValueOffset(), // value offset + kv.getValueLength()); // value length + } + } put.add(kv); } return put; } + + @Override + public void setup(Context context) { + // Make a map from sourceCfName to destCfName by parsing a config key + cfRenameMap = null; + String allMappingsPropVal = context.getConfiguration().get(CF_RENAME_PROP); + if(allMappingsPropVal != null) { + // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... + String[] allMappings = allMappingsPropVal.split(","); + for (String mapping: allMappings) { + if(cfRenameMap == null) { + cfRenameMap = new TreeMap(Bytes.BYTES_COMPARATOR); + } + String [] srcAndDest = mapping.split(":"); + if(srcAndDest.length != 2) { + continue; + } + cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes()); + } + } + } } + + /** + *

Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells + * the mapper how to rename column families. + * + *

Alternately, instead of calling this function, you could set the configuration key + * {@link #CF_RENAME_PROP} yourself. The value should look like + *

srcCf1:destCf1,srcCf2:destCf2,....
. This would have the same effect on + * the mapper behavior. + * + * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be + * set + * @param renameMap a mapping from source CF names to destination CF names + */ + static public void configureCfRenaming(Configuration conf, + Map renameMap) { + StringBuilder sb = new StringBuilder(); + for(Map.Entry entry: renameMap.entrySet()) { + String sourceCf = entry.getKey(); + String destCf = entry.getValue(); + + if(sourceCf.contains(":") || sourceCf.contains(",") || + destCf.contains(":") || destCf.contains(",")) { + throw new IllegalArgumentException("Illegal character in CF names: " + + sourceCf + ", " + destCf); + } + + if(sb.length() != 0) { + sb.append(","); + } + sb.append(sourceCf + ":" + destCf); + } + conf.set(CF_RENAME_PROP, sb.toString()); + } + /** * Sets up the actual job.