HBASE-5431 Improve delete marker handling in Import M/R jobs
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1290955 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
145d25dfc2
commit
ad14643d55
|
@ -118,25 +118,6 @@ public class Delete extends Mutation
|
||||||
this.writeToWAL = d.writeToWAL;
|
this.writeToWAL = d.writeToWAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Advanced use only. Create a Delete object based on a KeyValue
|
|
||||||
* of type "delete".
|
|
||||||
* @param kv
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public Delete(KeyValue kv) throws IOException {
|
|
||||||
this(kv.getRow(), kv.getTimestamp(), null);
|
|
||||||
if (!kv.isDelete()) {
|
|
||||||
throw new IOException("The recently added KeyValue is not of type "
|
|
||||||
+ "delete. Rowkey: " + Bytes.toStringBinary(this.row));
|
|
||||||
}
|
|
||||||
// can't use singletonList, because this might be modified at the server by
|
|
||||||
// coprocessors
|
|
||||||
ArrayList<KeyValue> list = new ArrayList<KeyValue>(1);
|
|
||||||
list.add(kv);
|
|
||||||
familyMap.put(kv.getFamily(), list);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Advanced use only.
|
* Advanced use only.
|
||||||
* Add an existing delete marker to this Delete object.
|
* Add an existing delete marker to this Delete object.
|
||||||
|
|
|
@ -74,6 +74,7 @@ public class Import {
|
||||||
private void writeResult(ImmutableBytesWritable key, Result result, Context context)
|
private void writeResult(ImmutableBytesWritable key, Result result, Context context)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
Put put = null;
|
Put put = null;
|
||||||
|
Delete delete = null;
|
||||||
for (KeyValue kv : result.raw()) {
|
for (KeyValue kv : result.raw()) {
|
||||||
if(cfRenameMap != null) {
|
if(cfRenameMap != null) {
|
||||||
// If there's a rename mapping for this CF, create a new KeyValue
|
// If there's a rename mapping for this CF, create a new KeyValue
|
||||||
|
@ -95,13 +96,13 @@ public class Import {
|
||||||
kv.getValueLength()); // value length
|
kv.getValueLength()); // value length
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Deletes and Puts are gathered and written when finished
|
||||||
if (kv.isDelete()) {
|
if (kv.isDelete()) {
|
||||||
// Deletes need to be written one-by-one,
|
if (delete == null) {
|
||||||
// since family deletes overwrite column(s) deletes
|
delete = new Delete(key.get());
|
||||||
context.write(key, new Delete(kv));
|
}
|
||||||
|
delete.addDeleteMarker(kv);
|
||||||
} else {
|
} else {
|
||||||
// Puts are gathered into a single Put object
|
|
||||||
// and written when finished
|
|
||||||
if (put == null) {
|
if (put == null) {
|
||||||
put = new Put(key.get());
|
put = new Put(key.get());
|
||||||
}
|
}
|
||||||
|
@ -111,6 +112,9 @@ public class Import {
|
||||||
if (put != null) {
|
if (put != null) {
|
||||||
context.write(key, put);
|
context.write(key, put);
|
||||||
}
|
}
|
||||||
|
if (delete != null) {
|
||||||
|
context.write(key, delete);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue