HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1152947 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
50e3d546eb
commit
0cd2d8d932
|
@ -182,6 +182,7 @@ Release 0.91.0 - Unreleased
|
|||
code then HTable object loops continuously waiting for the root region
|
||||
by using /hbase as the base node.(ramkrishna.s.vasudevan)
|
||||
HBASE-4032 HBASE-451 improperly breaks public API HRegionInfo#getTableDesc
|
||||
HBASE-4148 HFileOutputFormat doesn't fill in TIMERANGE_KEY metadata (Jonathan Hsieh)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||
|
|
|
@ -32,6 +32,8 @@ import java.util.Map;
|
|||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.filecache.DistributedCache;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -46,9 +48,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
|||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Partitioner;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
|
@ -56,9 +60,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* Writes HFiles. Passed KeyValues must arrive in order.
|
||||
* Currently, can only write files to a single column family at a
|
||||
|
@ -71,7 +72,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
|
||||
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
|
||||
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
|
||||
|
||||
TimeRangeTracker trt = new TimeRangeTracker();
|
||||
|
||||
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
|
||||
throws IOException, InterruptedException {
|
||||
// Get the path of the temporary output file
|
||||
|
@ -135,6 +137,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
|
||||
// we now have the proper HLog writer. full steam ahead
|
||||
kv.updateLatestStamp(this.now);
|
||||
trt.includeTimestamp(kv);
|
||||
wl.writer.append(kv);
|
||||
wl.written += length;
|
||||
|
||||
|
@ -178,8 +181,10 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
Bytes.toBytes(System.currentTimeMillis()));
|
||||
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
|
||||
Bytes.toBytes(context.getTaskAttemptID().toString()));
|
||||
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
|
||||
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(true));
|
||||
w.appendFileInfo(StoreFile.TIMERANGE_KEY,
|
||||
WritableUtils.toByteArray(trt));
|
||||
w.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,7 +111,7 @@ public class StoreFile {
|
|||
/** Bloom filter Type in FileInfo */
|
||||
static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
|
||||
/** Key for Timerange information in metadata*/
|
||||
static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
|
||||
public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
|
||||
|
||||
/** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
|
||||
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
@ -57,8 +58,10 @@ import org.apache.hadoop.hbase.io.hfile.Compression;
|
|||
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
|
@ -229,6 +232,76 @@ public class TestHFileOutputFormat {
|
|||
return context;
|
||||
}
|
||||
|
||||
/*
|
||||
* Test that {@link HFileOutputFormat} creates an HFile with TIMERANGE
|
||||
* metadata used by time-restricted scans.
|
||||
*/
|
||||
@Test
|
||||
public void test_TIMERANGE()
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = new Configuration(this.util.getConfiguration());
|
||||
RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
|
||||
TaskAttemptContext context = null;
|
||||
Path dir =
|
||||
HBaseTestingUtility.getTestDir("test_TIMERANGE_present");
|
||||
LOG.info("Timerange dir writing to dir: "+ dir);
|
||||
try {
|
||||
// build a record writer using HFileOutputFormat
|
||||
Job job = new Job(conf);
|
||||
FileOutputFormat.setOutputPath(job, dir);
|
||||
context = new TaskAttemptContext(job.getConfiguration(),
|
||||
new TaskAttemptID());
|
||||
HFileOutputFormat hof = new HFileOutputFormat();
|
||||
writer = hof.getRecordWriter(context);
|
||||
|
||||
// Pass two key values with explicit times stamps
|
||||
final byte [] b = Bytes.toBytes("b");
|
||||
|
||||
// value 1 with timestamp 2000
|
||||
KeyValue kv = new KeyValue(b, b, b, 2000, b);
|
||||
KeyValue original = kv.clone();
|
||||
writer.write(new ImmutableBytesWritable(), kv);
|
||||
assertEquals(original,kv);
|
||||
|
||||
// value 2 with timestamp 1000
|
||||
kv = new KeyValue(b, b, b, 1000, b);
|
||||
original = kv.clone();
|
||||
writer.write(new ImmutableBytesWritable(), kv);
|
||||
assertEquals(original, kv);
|
||||
|
||||
// verify that the file has the proper FileInfo.
|
||||
writer.close(context);
|
||||
|
||||
// the generated file lives 3 directories down and is the only file,
|
||||
// so we traverse the dirs to get to the file
|
||||
// ./_temporary/_attempt__0000_r_000000_0/b/1979617994050536795
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path path = HFileOutputFormat.getOutputPath(job);
|
||||
FileStatus[] sub1 = fs.listStatus(path);
|
||||
FileStatus[] sub2 = fs.listStatus(sub1[0].getPath());
|
||||
FileStatus[] sub3 = fs.listStatus(sub2[0].getPath());
|
||||
FileStatus[] file = fs.listStatus(sub3[0].getPath());
|
||||
|
||||
// open as HFile Reader and pull out TIMERANGE FileInfo.
|
||||
HFile.Reader rd = new HFile.Reader(fs, file[0].getPath(), null, true,
|
||||
false);
|
||||
Map<byte[],byte[]> finfo = rd.loadFileInfo();
|
||||
byte[] range = finfo.get("TIMERANGE".getBytes());
|
||||
assertNotNull(range);
|
||||
|
||||
// unmarshall and check values.
|
||||
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
|
||||
Writables.copyWritable(range, timeRangeTracker);
|
||||
LOG.info(timeRangeTracker.getMinimumTimestamp() +
|
||||
"...." + timeRangeTracker.getMaximumTimestamp());
|
||||
assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
|
||||
assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
|
||||
} finally {
|
||||
if (writer != null && context != null) writer.close(context);
|
||||
dir.getFileSystem(conf).delete(dir, true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run small MR job.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue