HBASE-2803 Remove remaining Get code from Store.java,etc

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@964645 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-07-15 23:49:59 +00:00
parent e95b60b9d3
commit e4ee10d24f
11 changed files with 136 additions and 309 deletions

View File

@ -25,6 +25,7 @@ Release 0.21.0 - Unreleased
HBASE-2565 Remove contrib module from hbase
HBASE-2397 Bytes.toStringBinary escapes printable chars
HBASE-2771 Update our hadoop jar to be latest from 0.20-append branch
HBASE-2803 Remove remaining Get code from Store.java,etc
BUG FIXES
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew

View File

@ -3007,14 +3007,11 @@ public class HRegion implements HeapSize { // , Writable{
try {
Store store = stores.get(family);
// TODO call the proper GET API
// Get the old value:
Get get = new Get(row);
get.addColumn(family, qualifier);
List<KeyValue> results = new ArrayList<KeyValue>();
NavigableSet<byte[]> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qualifier);
store.get(get, qualifiers, results);
List<KeyValue> results = get(get);
if (!results.isEmpty()) {
KeyValue kv = results.get(0);

View File

@ -400,35 +400,8 @@ public class MemStore implements HeapSize {
}
}
//
// HBASE-880/1249/1304
//
/**
* Perform a single-row Get on the and snapshot, placing results
* into the specified KV list.
* <p>
* This will return true if it is determined that the query is complete
* and it is not necessary to check any storefiles after this.
* <p>
* Otherwise, it will return false and you should continue on.
* @param matcher Column matcher
* @param result List to add results to
* @return true if done with store (early-out), false if not
*/
public boolean get(QueryMatcher matcher, List<KeyValue> result) {
this.lock.readLock().lock();
try {
if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
return true;
}
matcher.update();
return internalGet(this.snapshot, matcher, result) || matcher.isDone();
} finally {
this.lock.readLock().unlock();
}
}
// TODO fix this not to use QueryMatcher!
/**
* Gets from either the memstore or the snapshop, and returns a code
* to let you know which is which.

View File

@ -41,18 +41,39 @@ public class ReadWriteConsistencyControl {
private static final ThreadLocal<Long> perThreadReadPoint =
new ThreadLocal<Long>();
/**
* Get this thread's read point. Used primarily by the memstore scanner to
* know which values to skip (ie: have not been completed/committed to
* memstore).
*/
public static long getThreadReadPoint() {
return perThreadReadPoint.get();
}
/**
* Set the thread read point to the given value. The thread RWCC
* is used by the Memstore scanner so it knows which values to skip.
* Give it a value of 0 if you want everything.
*/
public static void setThreadReadPoint(long readPoint) {
perThreadReadPoint.set(readPoint);
}
/**
* Set the thread RWCC read point to whatever the current read point is in
* this particular instance of RWCC. Returns the new thread read point value.
*/
public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
perThreadReadPoint.set(rwcc.memstoreReadPoint());
return getThreadReadPoint();
}
/**
* Set the thread RWCC read point to 0 (include everything).
*/
public static void resetThreadReadPoint() {
perThreadReadPoint.set(0L);
}
public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) {

View File

@ -190,7 +190,7 @@ public class Store implements HeapSize {
this.storefiles = ImmutableList.copyOf(loadStoreFiles());
}
HColumnDescriptor getFamily() {
public HColumnDescriptor getFamily() {
return this.family;
}
@ -958,13 +958,6 @@ public class Store implements HeapSize {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
boolean b = set.remove(kv);
if (LOG.isDebugEnabled()) {
LOG.debug(kv.toString() + " expired: " + b);
}
}
static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
return key.getTimestamp() < oldestTimestamp;
}
@ -1204,7 +1197,7 @@ public class Store implements HeapSize {
* Return a scanner for both the memstore and the HStore files
* @throws IOException
*/
protected KeyValueScanner getScanner(Scan scan,
public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) throws IOException {
lock.readLock().lock();
try {
@ -1288,85 +1281,6 @@ public class Store implements HeapSize {
return this.region.regionInfo;
}
/**
* Convenience method that implements the old MapFile.getClosest on top of
* HFile Scanners. getClosest used seek to the asked-for key or just after
* (HFile seeks to the key or just before).
* @param s Scanner to use
* @param kv Key to find.
* @return True if we were able to seek the scanner to <code>b</code> or to
* the key just after.
* @throws IOException
*/
static boolean getClosest(final HFileScanner s, final KeyValue kv)
throws IOException {
// Pass offsets to key content of a KeyValue; thats whats in the hfile index.
int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
if (result < 0) {
// Not in file. Will the first key do?
if (!s.seekTo()) {
return false;
}
} else if (result > 0) {
// Less than what was asked for but maybe < because we're asking for
// r/c/HConstants.LATEST_TIMESTAMP -- what was returned was r/c-1/SOME_TS...
// A next will get us a r/c/SOME_TS.
if (!s.next()) {
return false;
}
}
return true;
}
/**
* Retrieve results from this store given the specified Get parameters.
* @param get Get operation
* @param columns List of columns to match, can be empty (not null)
* @param result List to add results to
* @throws IOException
*/
public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result)
throws IOException {
KeyComparator keyComparator = this.comparator.getRawComparator();
// Column matching and version enforcement
QueryMatcher matcher = new QueryMatcher(get, this.family.getName(), columns,
this.ttl, keyComparator, versionsToReturn(get.getMaxVersions()));
this.lock.readLock().lock();
try {
// Read from memstore
if(this.memstore.get(matcher, result)) {
// Received early-out from memstore
return;
}
// Check if we even have storefiles
if (this.storefiles.isEmpty()) {
return;
}
// Get storefiles for this store
List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
for (StoreFile sf : Iterables.reverse(this.storefiles)) {
StoreFile.Reader r = sf.getReader();
if (r == null) {
LOG.warn("StoreFile " + sf + " has a null Reader");
continue;
}
// Get a scanner that caches the block and uses pread
storefileScanners.add(r.getScanner(true, true));
}
// StoreFileGetScan will handle reading this store's storefiles
StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
// Run a GET scan and put results into the specified list
scanner.get(result);
} finally {
this.lock.readLock().unlock();
}
}
/**
* Increments the value for the given row/family/qualifier.
*

View File

@ -96,7 +96,7 @@ public class TimeRangeTracker implements Writable {
* @param timestamp the timestamp value to include
*/
private void includeTimestamp(final long timestamp) {
if (maximumTimestamp==-1) {
if (maximumTimestamp == -1) {
minimumTimestamp = timestamp;
maximumTimestamp = timestamp;
}

View File

@ -29,6 +29,9 @@ import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
@ -39,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
@ -48,6 +52,9 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
@ -280,7 +287,7 @@ public class HBaseTestingUtility {
// Do old style too just to be safe.
this.conf.set("fs.default.name", fs.getUri().toString());
this.dfsCluster.waitClusterUp();
// Start up a zk cluster.
if (this.zkCluster == null) {
startMiniZKCluster(this.clusterTestBuildDir);
@ -962,4 +969,42 @@ public class HBaseTestingUtility {
Threads.sleep(1000);
}
}
}
/**
* Do a small get/scan against one store. This is required because store
* has no actual methods of querying itself, and relies on StoreScanner.
*/
public static List<KeyValue> getFromStoreFile(Store store,
Get get) throws IOException {
ReadWriteConsistencyControl.resetThreadReadPoint();
Scan scan = new Scan(get);
InternalScanner scanner = (InternalScanner) store.getScanner(scan,
scan.getFamilyMap().get(store.getFamily().getName()));
List<KeyValue> result = new ArrayList<KeyValue>();
scanner.next(result);
if (!result.isEmpty()) {
// verify that we are on the row we want:
KeyValue kv = result.get(0);
if (!Bytes.equals(kv.getRow(), get.getRow())) {
result.clear();
}
}
return result;
}
/**
* Do a small get/scan against one store. This is required because store
* has no actual methods of querying itself, and relies on StoreScanner.
*/
public static List<KeyValue> getFromStoreFile(Store store,
byte [] row,
NavigableSet<byte[]> columns
) throws IOException {
Get get = new Get(row);
Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
s.put(store.getFamily().getName(), columns);
return getFromStoreFile(store,get);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one

View File

@ -1367,6 +1367,7 @@ public class TestHRegion extends HBaseTestCase {
String method = this.getName();
initHRegion(tableName, method, families);
//Putting data in Region
Put put = new Put(row1);
put.add(fam1, null, null);
@ -1384,10 +1385,12 @@ public class TestHRegion extends HBaseTestCase {
scan.addFamily(fam2);
scan.addFamily(fam4);
is = (RegionScanner) region.getScanner(scan);
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
assertEquals(1, ((RegionScanner)is).storeHeap.getHeap().size());
scan = new Scan();
is = (RegionScanner) region.getScanner(scan);
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
assertEquals(families.length -1,
((RegionScanner)is).storeHeap.getHeap().size());
}

View File

@ -570,64 +570,6 @@ public class TestMemStore extends TestCase {
}
}
public void testGet_Basic_Found() throws IOException {
byte [] row = Bytes.toBytes("testrow");
byte [] fam = Bytes.toBytes("testfamily");
byte [] qf1 = Bytes.toBytes("testqualifier1");
byte [] qf2 = Bytes.toBytes("testqualifier2");
byte [] qf3 = Bytes.toBytes("testqualifier3");
byte [] val = Bytes.toBytes("testval");
//Setting up memstore
KeyValue add1 = new KeyValue(row, fam ,qf1, val);
KeyValue add2 = new KeyValue(row, fam ,qf2, val);
KeyValue add3 = new KeyValue(row, fam ,qf3, val);
memstore.add(add1);
memstore.add(add2);
memstore.add(add3);
//test
Get get = new Get(row);
NavigableSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
columns.add(qf2);
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
List<KeyValue> result = new ArrayList<KeyValue>();
boolean res = memstore.get(matcher, result);
assertEquals(true, res);
}
public void testGet_Basic_NotFound() throws IOException {
byte [] row = Bytes.toBytes("testrow");
byte [] fam = Bytes.toBytes("testfamily");
byte [] qf1 = Bytes.toBytes("testqualifier1");
byte [] qf2 = Bytes.toBytes("testqualifier2");
byte [] qf3 = Bytes.toBytes("testqualifier3");
byte [] val = Bytes.toBytes("testval");
//Setting up memstore
KeyValue add1 = new KeyValue(row, fam ,qf1, val);
KeyValue add3 = new KeyValue(row, fam ,qf3, val);
memstore.add(add1);
memstore.add(add3);
//test
Get get = new Get(row);
NavigableSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
columns.add(qf2);
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
List<KeyValue> result = new ArrayList<KeyValue>();
boolean res = memstore.get(matcher, result);
assertEquals(false, res);
}
public void testGet_memstoreAndSnapShot() throws IOException {
byte [] row = Bytes.toBytes("testrow");
byte [] fam = Bytes.toBytes("testfamily");
@ -638,16 +580,6 @@ public class TestMemStore extends TestCase {
byte [] qf5 = Bytes.toBytes("testqualifier5");
byte [] val = Bytes.toBytes("testval");
//Creating get
Get get = new Get(row);
NavigableSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
columns.add(qf2);
columns.add(qf4);
long ttl = Long.MAX_VALUE;
QueryMatcher matcher =
new QueryMatcher(get, fam, columns, ttl, KeyValue.KEY_COMPARATOR, 1);
//Setting up memstore
memstore.add(new KeyValue(row, fam ,qf1, val));
memstore.add(new KeyValue(row, fam ,qf2, val));
@ -660,64 +592,6 @@ public class TestMemStore extends TestCase {
memstore.add(new KeyValue(row, fam ,qf4, val));
memstore.add(new KeyValue(row, fam ,qf5, val));
assertEquals(2, memstore.kvset.size());
List<KeyValue> result = new ArrayList<KeyValue>();
boolean res = memstore.get(matcher, result);
assertEquals(true, res);
}
public void testGet_SpecificTimeStamp() throws IOException {
byte [] row = Bytes.toBytes("testrow");
byte [] fam = Bytes.toBytes("testfamily");
byte [] qf1 = Bytes.toBytes("testqualifier1");
byte [] qf2 = Bytes.toBytes("testqualifier2");
byte [] qf3 = Bytes.toBytes("testqualifier3");
byte [] val = Bytes.toBytes("testval");
long ts1 = System.currentTimeMillis();
long ts2 = ts1++;
long ts3 = ts2++;
//Creating get
Get get = new Get(row);
get.setTimeStamp(ts2);
NavigableSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
columns.add(qf1);
columns.add(qf2);
columns.add(qf3);
long ttl = Long.MAX_VALUE;
QueryMatcher matcher = new QueryMatcher(get, fam, columns, ttl,
KeyValue.KEY_COMPARATOR, 1);
//Setting up expected
List<KeyValue> expected = new ArrayList<KeyValue>();
KeyValue kv1 = new KeyValue(row, fam ,qf1, ts2, val);
KeyValue kv2 = new KeyValue(row, fam ,qf2, ts2, val);
KeyValue kv3 = new KeyValue(row, fam ,qf3, ts2, val);
expected.add(kv1);
expected.add(kv2);
expected.add(kv3);
//Setting up memstore
memstore.add(new KeyValue(row, fam ,qf1, ts1, val));
memstore.add(new KeyValue(row, fam ,qf2, ts1, val));
memstore.add(new KeyValue(row, fam ,qf3, ts1, val));
memstore.add(kv1);
memstore.add(kv2);
memstore.add(kv3);
memstore.add(new KeyValue(row, fam ,qf1, ts3, val));
memstore.add(new KeyValue(row, fam ,qf2, ts3, val));
memstore.add(new KeyValue(row, fam ,qf3, ts3, val));
//Get
List<KeyValue> result = new ArrayList<KeyValue>();
memstore.get(matcher, result);
assertEquals(expected.size(), result.size());
for(int i=0; i<expected.size(); i++){
assertEquals(expected.get(i), result.get(i));
}
}
//////////////////////////////////////////////////////////////////////////////

View File

@ -64,7 +64,7 @@ import com.google.common.base.Joiner;
*/
public class TestStore extends TestCase {
public static final Log LOG = LogFactory.getLog(TestStore.class);
Store store;
byte [] table = Bytes.toBytes("table");
byte [] family = Bytes.toBytes("family");
@ -101,7 +101,7 @@ public class TestStore extends TestCase {
Iterator<byte[]> iter = qualifiers.iterator();
while(iter.hasNext()){
byte [] next = iter.next();
expected.add(new KeyValue(row, family, next, null));
expected.add(new KeyValue(row, family, next, 1, (byte[])null));
get.addColumn(family, next);
}
}
@ -109,7 +109,7 @@ public class TestStore extends TestCase {
private void init(String methodName) throws IOException {
init(methodName, HBaseConfiguration.create());
}
private void init(String methodName, Configuration conf)
throws IOException {
//Setting up a Store
@ -142,8 +142,8 @@ public class TestStore extends TestCase {
public void testEmptyStoreFile() throws IOException {
init(this.getName());
// Write a store file.
this.store.add(new KeyValue(row, family, qf1, null));
this.store.add(new KeyValue(row, family, qf2, null));
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
flush(1);
// Now put in place an empty store file. Its a little tricky. Have to
// do manually with hacked in sequence id.
@ -152,7 +152,7 @@ public class TestStore extends TestCase {
long seqid = f.getMaxSequenceId();
Configuration c = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(c);
StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
StoreFile.DEFAULT_BLOCKSIZE_SMALL);
w.appendMetadata(seqid + 1, false);
w.close();
@ -163,7 +163,10 @@ public class TestStore extends TestCase {
this.store.getFamily(), fs, c);
System.out.println(this.store.getHRegionInfo().getEncodedName());
assertEquals(2, this.store.getStorefilesCount());
this.store.get(get, qualifiers, result);
result = HBaseTestingUtility.getFromStoreFile(store,
get.getRow(),
qualifiers);
assertEquals(1, result.size());
}
@ -175,15 +178,16 @@ public class TestStore extends TestCase {
init(this.getName());
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, null));
this.store.add(new KeyValue(row, family, qf2, null));
this.store.add(new KeyValue(row, family, qf3, null));
this.store.add(new KeyValue(row, family, qf4, null));
this.store.add(new KeyValue(row, family, qf5, null));
this.store.add(new KeyValue(row, family, qf6, null));
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
//Get
this.store.get(get, qualifiers, result);
result = HBaseTestingUtility.getFromStoreFile(store,
get.getRow(), qualifiers);
//Compare
assertCheck();
@ -197,25 +201,28 @@ public class TestStore extends TestCase {
init(this.getName());
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, null));
this.store.add(new KeyValue(row, family, qf2, null));
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
//flush
flush(1);
//Add more data
this.store.add(new KeyValue(row, family, qf3, null));
this.store.add(new KeyValue(row, family, qf4, null));
this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
//flush
flush(2);
//Add more data
this.store.add(new KeyValue(row, family, qf5, null));
this.store.add(new KeyValue(row, family, qf6, null));
this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
//flush
flush(3);
//Get
this.store.get(get, qualifiers, result);
result = HBaseTestingUtility.getFromStoreFile(store,
get.getRow(),
qualifiers);
//this.store.get(get, qualifiers, result);
//Need to sort the result since multiple files
Collections.sort(result, KeyValue.COMPARATOR);
@ -232,23 +239,24 @@ public class TestStore extends TestCase {
init(this.getName());
//Put data in memstore
this.store.add(new KeyValue(row, family, qf1, null));
this.store.add(new KeyValue(row, family, qf2, null));
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
//flush
flush(1);
//Add more data
this.store.add(new KeyValue(row, family, qf3, null));
this.store.add(new KeyValue(row, family, qf4, null));
this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
//flush
flush(2);
//Add more data
this.store.add(new KeyValue(row, family, qf5, null));
this.store.add(new KeyValue(row, family, qf6, null));
this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
//Get
this.store.get(get, qualifiers, result);
result = HBaseTestingUtility.getFromStoreFile(store,
get.getRow(), qualifiers);
//Need to sort the result since multiple files
Collections.sort(result, KeyValue.COMPARATOR);
@ -316,7 +324,7 @@ public class TestStore extends TestCase {
NavigableSet<byte[]> cols = new TreeSet<byte[]>();
cols.add(qf1);
this.store.get(get, cols, results);
results = HBaseTestingUtility.getFromStoreFile(store, get);
assertEquals(2, results.size());
long ts1 = results.get(0).getTimestamp();
@ -342,21 +350,21 @@ public class TestStore extends TestCase {
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
assertEquals(FaultyFileSystem.class, fs.getClass());
// Initialize region
init(getName(), conf);
LOG.info("Adding some data");
this.store.add(new KeyValue(row, family, qf1, null));
this.store.add(new KeyValue(row, family, qf2, null));
this.store.add(new KeyValue(row, family, qf3, null));
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
LOG.info("Before flush, we should have no files");
FileStatus[] files = fs.listStatus(store.getHomedir());
Path[] paths = FileUtil.stat2Paths(files);
System.err.println("Got paths: " + Joiner.on(",").join(paths));
assertEquals(0, paths.length);
//flush
try {
LOG.info("Flushing");
@ -365,7 +373,7 @@ public class TestStore extends TestCase {
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Fault injected"));
}
LOG.info("After failed flush, we should still have no files!");
files = fs.listStatus(store.getHomedir());
paths = FileUtil.stat2Paths(files);
@ -373,27 +381,27 @@ public class TestStore extends TestCase {
assertEquals(0, paths.length);
}
static class FaultyFileSystem extends FilterFileSystem {
List<SoftReference<FaultyOutputStream>> outStreams =
new ArrayList<SoftReference<FaultyOutputStream>>();
private long faultPos = 200;
public FaultyFileSystem() {
super(new LocalFileSystem());
System.err.println("Creating faulty!");
}
@Override
public FSDataOutputStream create(Path p) throws IOException {
return new FaultyOutputStream(super.create(p), faultPos);
}
}
static class FaultyOutputStream extends FSDataOutputStream {
volatile long faultPos = Long.MAX_VALUE;
public FaultyOutputStream(FSDataOutputStream out,
long faultPos) throws IOException {
super(out, null);
@ -406,7 +414,7 @@ public class TestStore extends TestCase {
injectFault();
super.write(buf, offset, length);
}
private void injectFault() throws IOException {
if (getPos() >= faultPos) {
throw new IOException("Fault injected");
@ -414,8 +422,8 @@ public class TestStore extends TestCase {
}
}
private static void flushStore(Store store, long id) throws IOException {
StoreFlusher storeFlusher = store.getStoreFlusher(id);
storeFlusher.prepare();
@ -469,41 +477,32 @@ public class TestStore extends TestCase {
this.store.add(kv);
}
NavigableSet<byte[]> columns = new ConcurrentSkipListSet<byte[]>(
Bytes.BYTES_COMPARATOR);
columns.add(qf1);
List<KeyValue> result;
Get get = new Get(Bytes.toBytes(1));
get.addColumn(family,qf1);
get.setTimeRange(0,15);
result = new ArrayList<KeyValue>();
this.store.get(get, columns, result);
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()>0);
get.setTimeRange(40,90);
result = new ArrayList<KeyValue>();
this.store.get(get, columns, result);
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()>0);
get.setTimeRange(10,45);
result = new ArrayList<KeyValue>();
this.store.get(get, columns, result);
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()>0);
get.setTimeRange(80,145);
result = new ArrayList<KeyValue>();
this.store.get(get, columns, result);
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()>0);
get.setTimeRange(1,2);
result = new ArrayList<KeyValue>();
this.store.get(get, columns, result);
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()>0);
get.setTimeRange(90,200);
result = new ArrayList<KeyValue>();
this.store.get(get, columns, result);
result = HBaseTestingUtility.getFromStoreFile(store, get);
assertTrue(result.size()==0);
}
}