HBASE-2615 M/R on bulk imported tables
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@951742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d380a628bf
commit
493ccf01e8
|
@ -374,6 +374,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2662 TestScannerResource.testScannerResource broke in trunk
|
HBASE-2662 TestScannerResource.testScannerResource broke in trunk
|
||||||
HBASE-2667 TestHLog.testSplit failing in trunk (Cosmin and Stack)
|
HBASE-2667 TestHLog.testSplit failing in trunk (Cosmin and Stack)
|
||||||
HBASE-2614 killing server in TestMasterTransitions causes NPEs and test deadlock
|
HBASE-2614 killing server in TestMasterTransitions causes NPEs and test deadlock
|
||||||
|
HBASE-2615 M/R on bulk imported tables
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -797,8 +797,13 @@ public class KeyValue implements Writable, HeapSize {
|
||||||
HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG) == 0;
|
HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param now Time to set into <code>this</code> IFF timestamp ==
|
||||||
|
* {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
|
||||||
|
* @return True is we modified this.
|
||||||
|
*/
|
||||||
public boolean updateLatestStamp(final byte [] now) {
|
public boolean updateLatestStamp(final byte [] now) {
|
||||||
if(this.isLatestTimestamp()) {
|
if (this.isLatestTimestamp()) {
|
||||||
int tsOffset = getTimestampOffset();
|
int tsOffset = getTimestampOffset();
|
||||||
System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
|
System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -85,6 +85,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
private final Map<byte [], WriterLength> writers =
|
private final Map<byte [], WriterLength> writers =
|
||||||
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
|
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
|
||||||
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
|
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
|
||||||
|
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
|
||||||
|
|
||||||
public void write(ImmutableBytesWritable row, KeyValue kv)
|
public void write(ImmutableBytesWritable row, KeyValue kv)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -108,6 +109,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
||||||
((wl.written == 0)? "": ", wrote=" + wl.written));
|
((wl.written == 0)? "": ", wrote=" + wl.written));
|
||||||
wl.written = 0;
|
wl.written = 0;
|
||||||
}
|
}
|
||||||
|
kv.updateLatestStamp(this.now);
|
||||||
wl.writer.append(kv);
|
wl.writer.append(kv);
|
||||||
wl.written += length;
|
wl.written += length;
|
||||||
// Copy the row so we know when a row transition.
|
// Copy the row so we know when a row transition.
|
||||||
|
|
|
@ -1523,18 +1523,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
||||||
* @return <code>true</code> when updating the time stamp completed.
|
* @return <code>true</code> when updating the time stamp completed.
|
||||||
*/
|
*/
|
||||||
private boolean updateKeys(List<KeyValue> keys, byte [] now) {
|
private boolean updateKeys(List<KeyValue> keys, byte [] now) {
|
||||||
if(keys == null || keys.isEmpty()) {
|
if (keys == null || keys.isEmpty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for(KeyValue key : keys) {
|
for (KeyValue key : keys) {
|
||||||
if(key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
|
key.updateLatestStamp(now);
|
||||||
key.updateLatestStamp(now);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// /*
|
// /*
|
||||||
// * Utility method to verify values length.
|
// * Utility method to verify values length.
|
||||||
// * @param batchUpdate The update to verify
|
// * @param batchUpdate The update to verify
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -44,6 +46,9 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -128,6 +133,51 @@ public class TestHFileOutputFormat {
|
||||||
job.setMapOutputValueClass(KeyValue.class);
|
job.setMapOutputValueClass(KeyValue.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that {@link HFileOutputFormat} RecordWriter amends timestamps if
|
||||||
|
* passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
|
||||||
|
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_LATEST_TIMESTAMP_isReplaced()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Configuration conf = new Configuration(this.util.getConfiguration());
|
||||||
|
RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
|
||||||
|
TaskAttemptContext context = null;
|
||||||
|
Path dir =
|
||||||
|
HBaseTestingUtility.getTestDir("test_LATEST_TIMESTAMP_isReplaced");
|
||||||
|
try {
|
||||||
|
Job job = new Job(conf);
|
||||||
|
FileOutputFormat.setOutputPath(job, dir);
|
||||||
|
context = new TaskAttemptContext(job.getConfiguration(),
|
||||||
|
new TaskAttemptID());
|
||||||
|
HFileOutputFormat hof = new HFileOutputFormat();
|
||||||
|
writer = hof.getRecordWriter(context);
|
||||||
|
final byte [] b = Bytes.toBytes("b");
|
||||||
|
|
||||||
|
// Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
|
||||||
|
// changed by call to write. Check all in kv is same but ts.
|
||||||
|
KeyValue kv = new KeyValue(b, b, b);
|
||||||
|
KeyValue original = kv.clone();
|
||||||
|
writer.write(new ImmutableBytesWritable(), kv);
|
||||||
|
assertFalse(original.equals(kv));
|
||||||
|
assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
|
||||||
|
assertTrue(original.matchingColumn(kv.getFamily(), kv.getQualifier()));
|
||||||
|
assertNotSame(original.getTimestamp(), kv.getTimestamp());
|
||||||
|
assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
|
||||||
|
|
||||||
|
// Test 2. Now test passing a kv that has explicit ts. It should not be
|
||||||
|
// changed by call to record write.
|
||||||
|
kv = new KeyValue(b, b, b, System.currentTimeMillis(), b);
|
||||||
|
original = kv.clone();
|
||||||
|
writer.write(new ImmutableBytesWritable(), kv);
|
||||||
|
assertTrue(original.equals(kv));
|
||||||
|
} finally {
|
||||||
|
if (writer != null && context != null) writer.close(context);
|
||||||
|
dir.getFileSystem(conf).delete(dir, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run small MR job.
|
* Run small MR job.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue