HBASE-4682 Support deleted rows using Import/Export (Lars H)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1212181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
14a266fb6c
commit
e167c8ab28
@ -118,6 +118,25 @@ public class Delete extends Mutation
|
|||||||
this.writeToWAL = d.writeToWAL;
|
this.writeToWAL = d.writeToWAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Advanced use only. Create a Delete object based on a KeyValue
|
||||||
|
* of type "delete".
|
||||||
|
* @param kv
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Delete(KeyValue kv) throws IOException {
|
||||||
|
this(kv.getRow(), kv.getTimestamp(), null);
|
||||||
|
if (!kv.isDelete()) {
|
||||||
|
throw new IOException("The recently added KeyValue is not of type "
|
||||||
|
+ "delete. Rowkey: " + Bytes.toStringBinary(this.row));
|
||||||
|
}
|
||||||
|
// can't use singletonList, because this might be modified at the server by
|
||||||
|
// coprocessors
|
||||||
|
ArrayList<KeyValue> list = new ArrayList<KeyValue>(1);
|
||||||
|
list.add(kv);
|
||||||
|
familyMap.put(kv.getFamily(), list);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete all versions of all columns of the specified family.
|
* Delete all versions of all columns of the specified family.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
@ -165,6 +164,9 @@ public class Scan extends OperationWithAttributes implements Writable {
|
|||||||
addFamily(fam);
|
addFamily(fam);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (Map.Entry<String, byte[]> attr : scan.getAttributesMap().entrySet()) {
|
||||||
|
setAttribute(attr.getKey(), attr.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,6 +48,7 @@ import org.apache.commons.logging.LogFactory;
|
|||||||
public class Export {
|
public class Export {
|
||||||
private static final Log LOG = LogFactory.getLog(Export.class);
|
private static final Log LOG = LogFactory.getLog(Export.class);
|
||||||
final static String NAME = "export";
|
final static String NAME = "export";
|
||||||
|
final static String RAW_SCAN="hbase.mapreduce.include.deleted.rows";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mapper.
|
* Mapper.
|
||||||
@ -115,6 +116,11 @@ public class Export {
|
|||||||
// Set cache blocks
|
// Set cache blocks
|
||||||
s.setCacheBlocks(false);
|
s.setCacheBlocks(false);
|
||||||
// Set Scan Column Family
|
// Set Scan Column Family
|
||||||
|
boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
|
||||||
|
if (raw) {
|
||||||
|
s.setRaw(raw);
|
||||||
|
}
|
||||||
|
|
||||||
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
|
if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
|
||||||
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
|
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
|
||||||
}
|
}
|
||||||
@ -124,8 +130,8 @@ public class Export {
|
|||||||
LOG.info("Setting Scan Filter for Export.");
|
LOG.info("Setting Scan Filter for Export.");
|
||||||
s.setFilter(exportFilter);
|
s.setFilter(exportFilter);
|
||||||
}
|
}
|
||||||
LOG.info("verisons=" + versions + ", starttime=" + startTime +
|
LOG.info("versions=" + versions + ", starttime=" + startTime +
|
||||||
", endtime=" + endTime);
|
", endtime=" + endTime + ", keepDeletedCells=" + raw);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,6 +165,7 @@ public class Export {
|
|||||||
System.err.println(" Additionally, the following SCAN properties can be specified");
|
System.err.println(" Additionally, the following SCAN properties can be specified");
|
||||||
System.err.println(" to control/limit what is exported..");
|
System.err.println(" to control/limit what is exported..");
|
||||||
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
|
System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
|
||||||
|
System.err.println(" -D " + RAW_SCAN + "=true");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
@ -47,7 +49,7 @@ public class Import {
|
|||||||
* Write table content out to files in hdfs.
|
* Write table content out to files in hdfs.
|
||||||
*/
|
*/
|
||||||
static class Importer
|
static class Importer
|
||||||
extends TableMapper<ImmutableBytesWritable, Put> {
|
extends TableMapper<ImmutableBytesWritable, Mutation> {
|
||||||
private Map<byte[], byte[]> cfRenameMap;
|
private Map<byte[], byte[]> cfRenameMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -63,15 +65,15 @@ public class Import {
|
|||||||
Context context)
|
Context context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
context.write(row, resultToPut(row, value));
|
writeResult(row, value, context);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Put resultToPut(ImmutableBytesWritable key, Result result)
|
private void writeResult(ImmutableBytesWritable key, Result result, Context context)
|
||||||
throws IOException {
|
throws IOException, InterruptedException {
|
||||||
Put put = new Put(key.get());
|
Put put = null;
|
||||||
for (KeyValue kv : result.raw()) {
|
for (KeyValue kv : result.raw()) {
|
||||||
if(cfRenameMap != null) {
|
if(cfRenameMap != null) {
|
||||||
// If there's a rename mapping for this CF, create a new KeyValue
|
// If there's a rename mapping for this CF, create a new KeyValue
|
||||||
@ -93,9 +95,22 @@ public class Import {
|
|||||||
kv.getValueLength()); // value length
|
kv.getValueLength()); // value length
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
put.add(kv);
|
if (kv.isDelete()) {
|
||||||
|
// Deletes need to be written one-by-one,
|
||||||
|
// since family deletes overwrite column(s) deletes
|
||||||
|
context.write(key, new Delete(kv));
|
||||||
|
} else {
|
||||||
|
// Puts are gathered into a single Put object
|
||||||
|
// and written when finished
|
||||||
|
if (put == null) {
|
||||||
|
put = new Put(key.get());
|
||||||
|
}
|
||||||
|
put.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (put != null) {
|
||||||
|
context.write(key, put);
|
||||||
}
|
}
|
||||||
return put;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -0,0 +1,217 @@
|
|||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestImportExport {
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
private static final byte[] ROW1 = Bytes.toBytes("row1");
|
||||||
|
private static final byte[] ROW2 = Bytes.toBytes("row2");
|
||||||
|
private static final String FAMILYA_STRING = "a";
|
||||||
|
private static final String FAMILYB_STRING = "b";
|
||||||
|
private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
|
||||||
|
private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
|
||||||
|
private static final byte[] QUAL = Bytes.toBytes("q");
|
||||||
|
private static final String OUTPUT_DIR = "outputdir";
|
||||||
|
|
||||||
|
private static MiniHBaseCluster cluster;
|
||||||
|
private static long now = System.currentTimeMillis();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
cluster = UTIL.startMiniCluster();
|
||||||
|
UTIL.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
UTIL.shutdownMiniMapReduceCluster();
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@After
|
||||||
|
public void cleanup() throws Exception {
|
||||||
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
||||||
|
fs.delete(new Path(OUTPUT_DIR), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test simple replication case with column mapping
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSimpleCase() throws Exception {
|
||||||
|
String EXPORT_TABLE = "exportSimpleCase";
|
||||||
|
HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA);
|
||||||
|
Put p = new Put(ROW1);
|
||||||
|
p.add(FAMILYA, QUAL, now, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+1, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+2, QUAL);
|
||||||
|
t.put(p);
|
||||||
|
p = new Put(ROW2);
|
||||||
|
p.add(FAMILYA, QUAL, now, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+1, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+2, QUAL);
|
||||||
|
t.put(p);
|
||||||
|
|
||||||
|
String[] args = new String[] {
|
||||||
|
EXPORT_TABLE,
|
||||||
|
OUTPUT_DIR,
|
||||||
|
"1000"
|
||||||
|
};
|
||||||
|
|
||||||
|
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||||
|
Configuration conf = opts.getConfiguration();
|
||||||
|
args = opts.getRemainingArgs();
|
||||||
|
|
||||||
|
Job job = Export.createSubmittableJob(conf, args);
|
||||||
|
job.waitForCompletion(false);
|
||||||
|
assertTrue(job.isSuccessful());
|
||||||
|
|
||||||
|
|
||||||
|
String IMPORT_TABLE = "importTableSimpleCase";
|
||||||
|
t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB);
|
||||||
|
args = new String[] {
|
||||||
|
"-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
|
||||||
|
IMPORT_TABLE,
|
||||||
|
OUTPUT_DIR
|
||||||
|
};
|
||||||
|
|
||||||
|
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||||
|
conf = opts.getConfiguration();
|
||||||
|
args = opts.getRemainingArgs();
|
||||||
|
|
||||||
|
job = Import.createSubmittableJob(conf, args);
|
||||||
|
job.waitForCompletion(false);
|
||||||
|
assertTrue(job.isSuccessful());
|
||||||
|
|
||||||
|
Get g = new Get(ROW1);
|
||||||
|
g.setMaxVersions();
|
||||||
|
Result r = t.get(g);
|
||||||
|
assertEquals(3, r.size());
|
||||||
|
g = new Get(ROW2);
|
||||||
|
g.setMaxVersions();
|
||||||
|
r = t.get(g);
|
||||||
|
assertEquals(3, r.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithDeletes() throws Exception {
|
||||||
|
String EXPORT_TABLE = "exportWithDeletes";
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(EXPORT_TABLE);
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILYA,
|
||||||
|
HColumnDescriptor.DEFAULT_MIN_VERSIONS,
|
||||||
|
5, /* versions */
|
||||||
|
true /* keep deleted cells */,
|
||||||
|
HColumnDescriptor.DEFAULT_COMPRESSION,
|
||||||
|
HColumnDescriptor.DEFAULT_IN_MEMORY,
|
||||||
|
HColumnDescriptor.DEFAULT_BLOCKCACHE,
|
||||||
|
HColumnDescriptor.DEFAULT_BLOCKSIZE,
|
||||||
|
HColumnDescriptor.DEFAULT_TTL,
|
||||||
|
HColumnDescriptor.DEFAULT_BLOOMFILTER,
|
||||||
|
HConstants.REPLICATION_SCOPE_LOCAL));
|
||||||
|
UTIL.getHBaseAdmin().createTable(desc);
|
||||||
|
HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
|
||||||
|
|
||||||
|
Put p = new Put(ROW1);
|
||||||
|
p.add(FAMILYA, QUAL, now, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+1, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+2, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+3, QUAL);
|
||||||
|
p.add(FAMILYA, QUAL, now+4, QUAL);
|
||||||
|
t.put(p);
|
||||||
|
|
||||||
|
Delete d = new Delete(ROW1, now+3, null);
|
||||||
|
t.delete(d);
|
||||||
|
d = new Delete(ROW1);
|
||||||
|
d.deleteColumns(FAMILYA, QUAL, now+2);
|
||||||
|
t.delete(d);
|
||||||
|
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + Export.RAW_SCAN + "=true",
|
||||||
|
EXPORT_TABLE,
|
||||||
|
OUTPUT_DIR,
|
||||||
|
"1000"
|
||||||
|
};
|
||||||
|
|
||||||
|
GenericOptionsParser opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||||
|
Configuration conf = opts.getConfiguration();
|
||||||
|
args = opts.getRemainingArgs();
|
||||||
|
|
||||||
|
Job job = Export.createSubmittableJob(conf, args);
|
||||||
|
job.waitForCompletion(false);
|
||||||
|
assertTrue(job.isSuccessful());
|
||||||
|
|
||||||
|
|
||||||
|
String IMPORT_TABLE = "importWithDeletes";
|
||||||
|
desc = new HTableDescriptor(IMPORT_TABLE);
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILYA,
|
||||||
|
HColumnDescriptor.DEFAULT_MIN_VERSIONS,
|
||||||
|
5, /* versions */
|
||||||
|
true /* keep deleted cells */,
|
||||||
|
HColumnDescriptor.DEFAULT_COMPRESSION,
|
||||||
|
HColumnDescriptor.DEFAULT_IN_MEMORY,
|
||||||
|
HColumnDescriptor.DEFAULT_BLOCKCACHE,
|
||||||
|
HColumnDescriptor.DEFAULT_BLOCKSIZE,
|
||||||
|
HColumnDescriptor.DEFAULT_TTL,
|
||||||
|
HColumnDescriptor.DEFAULT_BLOOMFILTER,
|
||||||
|
HConstants.REPLICATION_SCOPE_LOCAL));
|
||||||
|
UTIL.getHBaseAdmin().createTable(desc);
|
||||||
|
t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
|
||||||
|
args = new String[] {
|
||||||
|
IMPORT_TABLE,
|
||||||
|
OUTPUT_DIR
|
||||||
|
};
|
||||||
|
|
||||||
|
opts = new GenericOptionsParser(new Configuration(cluster.getConfiguration()), args);
|
||||||
|
conf = opts.getConfiguration();
|
||||||
|
args = opts.getRemainingArgs();
|
||||||
|
|
||||||
|
job = Import.createSubmittableJob(conf, args);
|
||||||
|
job.waitForCompletion(false);
|
||||||
|
assertTrue(job.isSuccessful());
|
||||||
|
|
||||||
|
Scan s = new Scan();
|
||||||
|
s.setMaxVersions();
|
||||||
|
s.setRaw(true);
|
||||||
|
ResultScanner scanner = t.getScanner(s);
|
||||||
|
Result r = scanner.next();
|
||||||
|
KeyValue[] res = r.raw();
|
||||||
|
assertTrue(res[0].isDeleteFamily());
|
||||||
|
assertEquals(now+4, res[1].getTimestamp());
|
||||||
|
assertEquals(now+3, res[2].getTimestamp());
|
||||||
|
assertTrue(res[3].isDelete());
|
||||||
|
assertEquals(now+2, res[4].getTimestamp());
|
||||||
|
assertEquals(now+1, res[5].getTimestamp());
|
||||||
|
assertEquals(now, res[6].getTimestamp());
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user