HADOOP-1834 Scanners ignore timestamp passed on creation

M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    (addContents): Added overrides that allow specifying a timestamp.
M  src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
    Made it so test inherits from HBaseTestCase instead of from HBaseClusterTestCase
    so could add in tests that do not use cluster.
    (testTimestampScanning): Added test for hadoop-1834 bug.
    (testTimestamp): Refactored to remove duplicated code.
M  src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    (getNext): Make it respect the timestamp set on construction.
M  src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    Removed eclipse yellow flag warnings around empty parens and
    auto-boxing longs.
    (getNext): Make it respect the timestamp set on construction.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@572980 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-09-05 16:00:01 +00:00
parent 49d4f333f4
commit 1d19158ac5
5 changed files with 277 additions and 153 deletions

View File

@ -30,6 +30,7 @@ Trunk (unreleased changes)
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8") HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables HADOOP-1832 listTables() returns duplicate tables
HADOOP-1834 Scanners ignore timestamp passed on creation
IMPROVEMENTS IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable HADOOP-1737 Make HColumnDescriptor data publically members settable

View File

@ -58,7 +58,9 @@ public class HMemcache {
/** /**
* Constructor * Constructor
*/ */
public HMemcache() {} public HMemcache() {
super();
}
/** represents the state of the memcache at a specified point in time */ /** represents the state of the memcache at a specified point in time */
static class Snapshot { static class Snapshot {
@ -68,7 +70,7 @@ public class HMemcache {
Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) { Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) {
super(); super();
this.memcacheSnapshot = memcache; this.memcacheSnapshot = memcache;
this.sequenceId = i; this.sequenceId = i.longValue();
} }
} }
@ -95,7 +97,8 @@ public class HMemcache {
if(memcache.size() == 0) { if(memcache.size() == 0) {
return null; return null;
} }
Snapshot retval = new Snapshot(memcache, log.startCacheFlush()); Snapshot retval =
new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
this.snapshot = memcache; this.snapshot = memcache;
history.add(memcache); history.add(memcache);
memcache = new TreeMap<HStoreKey, byte []>(); memcache = new TreeMap<HStoreKey, byte []>();
@ -294,18 +297,16 @@ public class HMemcache {
final Iterator<HStoreKey> keyIterators[]; final Iterator<HStoreKey> keyIterators[];
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow) HMemcacheScanner(final long timestamp, final Text targetCols[],
throws IOException { final Text firstRow)
throws IOException {
super(timestamp, targetCols); super(timestamp, targetCols);
lock.obtainReadLock(); lock.obtainReadLock();
try { try {
this.backingMaps = new TreeMap[history.size() + 1]; this.backingMaps = new TreeMap[history.size() + 1];
//NOTE: Since we iterate through the backing maps from 0 to n, we need //NOTE: Since we iterate through the backing maps from 0 to n, we need
// to put the memcache first, the newest history second, ..., etc. // to put the memcache first, the newest history second, ..., etc.
backingMaps[0] = memcache; backingMaps[0] = memcache;
for(int i = history.size() - 1; i > 0; i--) { for(int i = history.size() - 1; i > 0; i--) {
backingMaps[i] = history.elementAt(i); backingMaps[i] = history.elementAt(i);
@ -364,13 +365,25 @@ public class HMemcache {
*/ */
@Override @Override
boolean getNext(int i) { boolean getNext(int i) {
if (!keyIterators[i].hasNext()) { boolean result = false;
closeSubScanner(i); while (true) {
return false; if (!keyIterators[i].hasNext()) {
closeSubScanner(i);
break;
}
// Check key is < than passed timestamp for this scanner.
HStoreKey hsk = keyIterators[i].next();
if (hsk == null) {
throw new NullPointerException("Unexpected null key");
}
if (hsk.getTimestamp() <= this.timestamp) {
this.keys[i] = hsk;
this.vals[i] = backingMaps[i].get(keys[i]);
result = true;
break;
}
} }
this.keys[i] = keyIterators[i].next(); return result;
this.vals[i] = backingMaps[i].get(keys[i]);
return true;
} }
/** Shut down an individual map iterator. */ /** Shut down an individual map iterator. */

View File

@ -528,7 +528,7 @@ class HStore implements HConstants {
*/ */
public boolean needsCompaction() { public boolean needsCompaction() {
return this.storefiles != null && return this.storefiles != null &&
this.storefiles.size() >= this.compactionThreshold; this.storefiles.size() >= this.compactionThreshold;
} }
/** /**
@ -1334,13 +1334,20 @@ class HStore implements HConstants {
*/ */
@Override @Override
boolean getNext(int i) throws IOException { boolean getNext(int i) throws IOException {
boolean result = false;
ImmutableBytesWritable ibw = new ImmutableBytesWritable(); ImmutableBytesWritable ibw = new ImmutableBytesWritable();
if (!readers[i].next(keys[i], ibw)) { while (true) {
closeSubScanner(i); if (!readers[i].next(keys[i], ibw)) {
return false; closeSubScanner(i);
break;
}
if (keys[i].getTimestamp() <= this.timestamp) {
vals[i] = ibw.get();
result = true;
break;
}
} }
vals[i] = ibw.get(); return result;
return true;
} }
/** Close down the indicated reader. */ /** Close down the indicated reader. */

View File

@ -39,6 +39,8 @@ public abstract class HBaseTestCase extends TestCase {
protected FileSystem localFs = null; protected FileSystem localFs = null;
protected static final char FIRST_CHAR = 'a'; protected static final char FIRST_CHAR = 'a';
protected static final char LAST_CHAR = 'z'; protected static final char LAST_CHAR = 'z';
protected static final byte [] START_KEY_BYTES =
{FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
static { static {
StaticTestEnvironment.initialize(); StaticTestEnvironment.initialize();
@ -117,9 +119,9 @@ public abstract class HBaseTestCase extends TestCase {
Text endKey = r.getRegionInfo().getEndKey(); Text endKey = r.getRegionInfo().getEndKey();
byte [] startKeyBytes = startKey.getBytes(); byte [] startKeyBytes = startKey.getBytes();
if (startKeyBytes == null || startKeyBytes.length == 0) { if (startKeyBytes == null || startKeyBytes.length == 0) {
startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; startKeyBytes = START_KEY_BYTES;
} }
addContent(new HRegionLoader(r), column, startKeyBytes, endKey); addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1);
} }
/** /**
@ -132,8 +134,7 @@ public abstract class HBaseTestCase extends TestCase {
*/ */
protected static void addContent(final Loader updater, final String column) protected static void addContent(final Loader updater, final String column)
throws IOException { throws IOException {
addContent(updater, column, addContent(updater, column, START_KEY_BYTES, null);
new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null);
} }
/** /**
@ -148,6 +149,23 @@ public abstract class HBaseTestCase extends TestCase {
*/ */
protected static void addContent(final Loader updater, final String column, protected static void addContent(final Loader updater, final String column,
final byte [] startKeyBytes, final Text endKey) final byte [] startKeyBytes, final Text endKey)
throws IOException {
addContent(updater, column, startKeyBytes, endKey, -1);
}
/**
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
* @param updater An instance of {@link Loader}.
* @param column
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @param ts Timestamp to write the content with.
* @throws IOException
*/
protected static void addContent(final Loader updater, final String column,
final byte [] startKeyBytes, final Text endKey, final long ts)
throws IOException { throws IOException {
// Add rows of three characters. The first character starts with the // Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the // 'a' character and runs up to 'z'. Per first character, we run the
@ -167,7 +185,11 @@ public abstract class HBaseTestCase extends TestCase {
long lockid = updater.startBatchUpdate(t); long lockid = updater.startBatchUpdate(t);
try { try {
updater.put(lockid, new Text(column), bytes); updater.put(lockid, new Text(column), bytes);
updater.commit(lockid); if (ts == -1) {
updater.commit(lockid);
} else {
updater.commit(lockid, ts);
}
lockid = -1; lockid = -1;
} finally { } finally {
if (lockid != -1) { if (lockid != -1) {
@ -190,6 +212,7 @@ public abstract class HBaseTestCase extends TestCase {
public long startBatchUpdate(final Text row) throws IOException; public long startBatchUpdate(final Text row) throws IOException;
public void put(long lockid, Text column, byte val[]) throws IOException; public void put(long lockid, Text column, byte val[]) throws IOException;
public void commit(long lockid) throws IOException; public void commit(long lockid) throws IOException;
public void commit(long lockid, long ts) throws IOException;
public void abort(long lockid) throws IOException; public void abort(long lockid) throws IOException;
} }
@ -208,6 +231,9 @@ public abstract class HBaseTestCase extends TestCase {
public void commit(long lockid) throws IOException { public void commit(long lockid) throws IOException {
this.region.commit(lockid, System.currentTimeMillis()); this.region.commit(lockid, System.currentTimeMillis());
} }
public void commit(long lockid, final long ts) throws IOException {
this.region.commit(lockid, ts);
}
public void put(long lockid, Text column, byte[] val) throws IOException { public void put(long lockid, Text column, byte[] val) throws IOException {
this.region.put(lockid, column, val); this.region.put(lockid, column, val);
} }
@ -231,6 +257,9 @@ public abstract class HBaseTestCase extends TestCase {
public void commit(long lockid) throws IOException { public void commit(long lockid) throws IOException {
this.table.commit(lockid); this.table.commit(lockid);
} }
public void commit(long lockid, final long ts) throws IOException {
this.table.commit(lockid, ts);
}
public void put(long lockid, Text column, byte[] val) throws IOException { public void put(long lockid, Text column, byte[] val) throws IOException {
this.table.put(lockid, column, val); this.table.put(lockid, column, val);
} }

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
/** Tests user specifyable time stamps */ /**
public class TestTimestamp extends HBaseClusterTestCase { * Tests user specifiable time stamps
*/
public class TestTimestamp extends HBaseTestCase {
private static final long T0 = 10L; private static final long T0 = 10L;
private static final long T1 = 100L; private static final long T1 = 100L;
private static final long T2 = 200L;
private static final String COLUMN_NAME = "contents:"; private static final String COLUMN_NAME = "contents:";
private static final String TABLE_NAME = "test"; private static final String TABLE_NAME = "test";
@ -38,156 +43,225 @@ public class TestTimestamp extends HBaseClusterTestCase {
private static final Text TABLE = new Text(TABLE_NAME); private static final Text TABLE = new Text(TABLE_NAME);
private static final Text ROW = new Text("row"); private static final Text ROW = new Text("row");
private HTable table; /**
* Test that delete works according to description in <a
/** constructor */ * href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>
public TestTimestamp() { * when it comes to timestamps.
super(); * @throws IOException
} */
public void testDelete() throws IOException {
/** {@inheritDoc} */ HRegion r = createRegion();
@Override
public void setUp() throws Exception {
super.setUp();
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
try { try {
HBaseAdmin admin = new HBaseAdmin(conf); HRegionLoader loader = new HRegionLoader(r);
admin.createTable(desc); // Add a couple of values for three different timestamps.
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
} catch (Exception e) { addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
e.printStackTrace(); addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T2);
fail(); addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
// If I delete w/o specifying a timestamp, this means I'm deleting the
// latest.
delete(r, System.currentTimeMillis());
// Verify that I get back T2 through T0.
} finally {
r.close();
r.getLog().closeAndDelete();
} }
} }
/** the test */ private void delete(final HRegion r, final long ts) throws IOException {
public void testTimestamp() { long lockid = r.startUpdate(ROW);
r.delete(lockid, COLUMN);
r.commit(lockid, ts == -1? System.currentTimeMillis(): ts);
}
/**
* Test scanning against different timestamps.
* @throws IOException
*/
public void testTimestampScanning() throws IOException {
HRegion r = createRegion();
try { try {
table = new HTable(conf, TABLE); HRegionLoader loader = new HRegionLoader(r);
// Add a couple of values for three different timestamps.
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
// Get count of latest items.
int count = assertScanContentTimestamp(r, System.currentTimeMillis());
// Assert I get same count when I scan at each timestamp.
assertEquals(count, assertScanContentTimestamp(r, T0));
assertEquals(count, assertScanContentTimestamp(r, T1));
// Flush everything out to disk and then retry
r.flushcache(false);
assertEquals(count, assertScanContentTimestamp(r, T0));
assertEquals(count, assertScanContentTimestamp(r, T1));
} finally {
r.close();
r.getLog().closeAndDelete();
}
}
/*
* Assert that the scan returns only values < timestamp.
* @param r
* @param ts
* @return Count of items scanned.
* @throws IOException
*/
private int assertScanContentTimestamp(final HRegion r, final long ts)
throws IOException {
int count = 0;
HInternalScannerInterface scanner =
r.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts, null);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []>value = new TreeMap<Text, byte[]>();
while (scanner.next(key, value)) {
assertTrue(key.getTimestamp() <= ts);
Text row = key.getRow();
assertEquals(row.toString(),
new String(value.get(COLUMN), HConstants.UTF8_ENCODING));
count++;
value.clear();
}
} finally {
scanner.close();
}
return count;
}
/**
* Basic test of timestamps.
* TODO: Needs rewrite after hadoop-1784 gets fixed.
* @throws IOException
*/
public void testTimestamps() throws IOException {
MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
try {
HTable table = createTable();
// store a value specifying an update time // store a value specifying an update time
put(table, VERSION1.getBytes(HConstants.UTF8_ENCODING), T0);
long lockid = table.startUpdate(ROW);
table.put(lockid, COLUMN, VERSION1.getBytes(HConstants.UTF8_ENCODING));
table.commit(lockid, T0);
// store a value specifying 'now' as the update time // store a value specifying 'now' as the update time
put(table, LATEST.getBytes(HConstants.UTF8_ENCODING), -1);
lockid = table.startUpdate(ROW);
table.put(lockid, COLUMN, LATEST.getBytes(HConstants.UTF8_ENCODING));
table.commit(lockid);
// delete values older than T1 // delete values older than T1
long lockid = table.startUpdate(ROW);
lockid = table.startUpdate(ROW);
table.delete(lockid, COLUMN); table.delete(lockid, COLUMN);
table.commit(lockid, T1); table.commit(lockid, T1);
// now retrieve... // now retrieve...
assertGets(table);
// the most recent version:
byte[] bytes = table.get(ROW, COLUMN);
assertTrue(bytes != null && bytes.length != 0);
assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
// any version <= time T1
byte[][] values = table.get(ROW, COLUMN, T1, 3);
assertNull(values);
// the version from T0
values = table.get(ROW, COLUMN, T0, 3);
assertTrue(values.length == 1
&& VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// flush everything out to disk // flush everything out to disk
HRegionServer s = cluster.regionThreads.get(0).getRegionServer(); HRegionServer s = cluster.regionThreads.get(0).getRegionServer();
for(HRegion r: s.onlineRegions.values() ) { for(HRegion r: s.onlineRegions.values() ) {
r.flushcache(false); r.flushcache(false);
} }
// now retrieve... // now retrieve...
assertGets(table);
// the most recent version:
bytes = table.get(ROW, COLUMN);
assertTrue(bytes != null && bytes.length != 0);
assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
// any version <= time T1
values = table.get(ROW, COLUMN, T1, 3);
assertNull(values);
// the version from T0
values = table.get(ROW, COLUMN, T0, 3);
assertTrue(values.length == 1
&& VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// three versions older than now
values = table.get(ROW, COLUMN, 3);
assertTrue(values.length == 1
&& LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// Test scanners // Test scanners
assertScanCount(table, -1, 1);
HScannerInterface scanner = assertScanCount(table, T1, 0);
table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, 1);
assertEquals(results.size(), 1);
} finally {
scanner.close();
}
scanner = table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, T1);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, 0);
assertEquals(results.size(), 0);
} finally {
scanner.close();
}
scanner = table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, T0);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, 0);
assertEquals(results.size(), 0);
} finally {
scanner.close();
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); cluster.shutdown();
fail();
} }
} }
/*
* Test count of results scanning.
* @param table
* @param ts
* @param expectedCount
* @throws IOException
*/
private void assertScanCount(final HTable table, final long ts,
final int expectedCount)
throws IOException {
HScannerInterface scanner = (ts == -1)?
table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW):
table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, expectedCount);
assertEquals(results.size(), expectedCount);
} finally {
scanner.close();
}
}
/*
* Test can do basic gets.
* Used by testTimestamp above.
* @param table
* @throws IOException
*/
private void assertGets(final HTable table) throws IOException {
// the most recent version:
byte[] bytes = table.get(ROW, COLUMN);
assertTrue(bytes != null && bytes.length != 0);
assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
// any version <= time T1
byte[][] values = table.get(ROW, COLUMN, T1, 3);
assertNull(values);
// the version from T0
values = table.get(ROW, COLUMN, T0, 3);
assertTrue(values.length == 1
&& VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// three versions older than now
values = table.get(ROW, COLUMN, 3);
assertTrue(values.length == 1
&& LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
}
/*
* Put values.
* @param table
* @param bytes
* @param ts
* @throws IOException
*/
private void put(final HTable table, final byte [] bytes, final long ts)
throws IOException {
long lockid = table.startUpdate(ROW);
table.put(lockid, COLUMN, bytes);
if (ts == -1) {
table.commit(lockid);
} else {
table.commit(lockid, ts);
}
}
/*
* Create a table named TABLE_NAME.
* @return An instance of an HTable connected to the created table.
* @throws IOException
*/
private HTable createTable() throws IOException {
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
return new HTable(conf, TABLE);
}
private HRegion createRegion() throws IOException {
HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
HTableDescriptor htd = createTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(COLUMN_NAME));
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
}
} }