From 1d19158ac5333a39c8c50fbd852133bb99ac4906 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 5 Sep 2007 16:00:01 +0000 Subject: [PATCH] HADOOP-1834 Scanners ignore timestamp passed on creation M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (addContents): Added overrides that allow specifying a timestamp. M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java Made it so test inherits from HBaseTestCase instead of from HBaseClusterTestCase so could add in tests that do not use cluster. (testTimestampScanning): Added test for hadoop-1834 bug. (testTimestamp): Refactored to remove duplicated code. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (getNext): Make it respect the timestamp set on construction. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Removed eclipse yellow flag warnings around empty parens and auto-boxing longs. (getNext): Make it respect the timestamp set on construction. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@572980 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../org/apache/hadoop/hbase/HMemcache.java | 43 ++- src/java/org/apache/hadoop/hbase/HStore.java | 19 +- .../apache/hadoop/hbase/HBaseTestCase.java | 39 ++- .../apache/hadoop/hbase/TestTimestamp.java | 328 +++++++++++------- 5 files changed, 277 insertions(+), 153 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 720651bf3ec..35f4fd807cd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,6 +30,7 @@ Trunk (unreleased changes) HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8") HADOOP-1832 listTables() returns duplicate tables + HADOOP-1834 Scanners ignore timestamp passed on creation IMPROVEMENTS HADOOP-1737 Make HColumnDescriptor data publically members settable diff --git a/src/java/org/apache/hadoop/hbase/HMemcache.java b/src/java/org/apache/hadoop/hbase/HMemcache.java index f5c01fc4e47..073355532f7 100644 --- a/src/java/org/apache/hadoop/hbase/HMemcache.java +++ b/src/java/org/apache/hadoop/hbase/HMemcache.java @@ -58,7 +58,9 @@ public class HMemcache { /** * Constructor */ - public HMemcache() {} + public HMemcache() { + super(); + } /** represents the state of the memcache at a specified point in time */ static class Snapshot { @@ -68,7 +70,7 @@ public class HMemcache { Snapshot(final TreeMap memcache, final Long i) { super(); this.memcacheSnapshot = memcache; - this.sequenceId = i; + this.sequenceId = i.longValue(); } } @@ -95,7 +97,8 @@ public class HMemcache { if(memcache.size() == 0) { return null; } - Snapshot retval = new Snapshot(memcache, log.startCacheFlush()); + Snapshot retval = + new Snapshot(memcache, Long.valueOf(log.startCacheFlush())); this.snapshot = memcache; history.add(memcache); memcache = new TreeMap(); @@ -294,18 +297,16 @@ public class HMemcache { final Iterator keyIterators[]; @SuppressWarnings("unchecked") - HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow) - throws IOException { - + HMemcacheScanner(final long timestamp, final Text targetCols[], + final Text firstRow) + throws IOException { super(timestamp, targetCols); - lock.obtainReadLock(); try { this.backingMaps = new TreeMap[history.size() + 1]; //NOTE: Since we iterate through the backing maps from 0 to n, we need - // to put the memcache first, the newest history second, ..., etc. - + // to put the memcache first, the newest history second, ..., etc. backingMaps[0] = memcache; for(int i = history.size() - 1; i > 0; i--) { backingMaps[i] = history.elementAt(i); @@ -364,13 +365,25 @@ public class HMemcache { */ @Override boolean getNext(int i) { - if (!keyIterators[i].hasNext()) { - closeSubScanner(i); - return false; + boolean result = false; + while (true) { + if (!keyIterators[i].hasNext()) { + closeSubScanner(i); + break; + } + // Check key is < than passed timestamp for this scanner. + HStoreKey hsk = keyIterators[i].next(); + if (hsk == null) { + throw new NullPointerException("Unexpected null key"); + } + if (hsk.getTimestamp() <= this.timestamp) { + this.keys[i] = hsk; + this.vals[i] = backingMaps[i].get(keys[i]); + result = true; + break; + } } - this.keys[i] = keyIterators[i].next(); - this.vals[i] = backingMaps[i].get(keys[i]); - return true; + return result; } /** Shut down an individual map iterator. */ diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index 5da541cddf2..577b9a731b1 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -528,7 +528,7 @@ class HStore implements HConstants { */ public boolean needsCompaction() { return this.storefiles != null && - this.storefiles.size() >= this.compactionThreshold; + this.storefiles.size() >= this.compactionThreshold; } /** @@ -1334,13 +1334,20 @@ class HStore implements HConstants { */ @Override boolean getNext(int i) throws IOException { + boolean result = false; ImmutableBytesWritable ibw = new ImmutableBytesWritable(); - if (!readers[i].next(keys[i], ibw)) { - closeSubScanner(i); - return false; + while (true) { + if (!readers[i].next(keys[i], ibw)) { + closeSubScanner(i); + break; + } + if (keys[i].getTimestamp() <= this.timestamp) { + vals[i] = ibw.get(); + result = true; + break; + } } - vals[i] = ibw.get(); - return true; + return result; } /** Close down the indicated reader. */ diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 8050f883384..f764b1def45 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -39,6 +39,8 @@ public abstract class HBaseTestCase extends TestCase { protected FileSystem localFs = null; protected static final char FIRST_CHAR = 'a'; protected static final char LAST_CHAR = 'z'; + protected static final byte [] START_KEY_BYTES = + {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; static { StaticTestEnvironment.initialize(); @@ -117,9 +119,9 @@ public abstract class HBaseTestCase extends TestCase { Text endKey = r.getRegionInfo().getEndKey(); byte [] startKeyBytes = startKey.getBytes(); if (startKeyBytes == null || startKeyBytes.length == 0) { - startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + startKeyBytes = START_KEY_BYTES; } - addContent(new HRegionLoader(r), column, startKeyBytes, endKey); + addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1); } /** @@ -132,8 +134,7 @@ public abstract class HBaseTestCase extends TestCase { */ protected static void addContent(final Loader updater, final String column) throws IOException { - addContent(updater, column, - new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null); + addContent(updater, column, START_KEY_BYTES, null); } /** @@ -148,6 +149,23 @@ public abstract class HBaseTestCase extends TestCase { */ protected static void addContent(final Loader updater, final String column, final byte [] startKeyBytes, final Text endKey) + throws IOException { + addContent(updater, column, startKeyBytes, endKey, -1); + } + + /** + * Add content to region r on the passed column + * column. + * Adds data of the from 'aaa', 'aab', etc where key and value are the same. + * @param updater An instance of {@link Loader}. + * @param column + * @param startKeyBytes Where to start the rows inserted + * @param endKey Where to stop inserting rows. + * @param ts Timestamp to write the content with. + * @throws IOException + */ + protected static void addContent(final Loader updater, final String column, + final byte [] startKeyBytes, final Text endKey, final long ts) throws IOException { // Add rows of three characters. The first character starts with the // 'a' character and runs up to 'z'. Per first character, we run the @@ -167,7 +185,11 @@ public abstract class HBaseTestCase extends TestCase { long lockid = updater.startBatchUpdate(t); try { updater.put(lockid, new Text(column), bytes); - updater.commit(lockid); + if (ts == -1) { + updater.commit(lockid); + } else { + updater.commit(lockid, ts); + } lockid = -1; } finally { if (lockid != -1) { @@ -190,6 +212,7 @@ public abstract class HBaseTestCase extends TestCase { public long startBatchUpdate(final Text row) throws IOException; public void put(long lockid, Text column, byte val[]) throws IOException; public void commit(long lockid) throws IOException; + public void commit(long lockid, long ts) throws IOException; public void abort(long lockid) throws IOException; } @@ -208,6 +231,9 @@ public abstract class HBaseTestCase extends TestCase { public void commit(long lockid) throws IOException { this.region.commit(lockid, System.currentTimeMillis()); } + public void commit(long lockid, final long ts) throws IOException { + this.region.commit(lockid, ts); + } public void put(long lockid, Text column, byte[] val) throws IOException { this.region.put(lockid, column, val); } @@ -231,6 +257,9 @@ public abstract class HBaseTestCase extends TestCase { public void commit(long lockid) throws IOException { this.table.commit(lockid); } + public void commit(long lockid, final long ts) throws IOException { + this.table.commit(lockid, ts); + } public void put(long lockid, Text column, byte[] val) throws IOException { this.table.put(lockid, column, val); } diff --git a/src/test/org/apache/hadoop/hbase/TestTimestamp.java b/src/test/org/apache/hadoop/hbase/TestTimestamp.java index 812b23a9dd0..44e63fbff40 100644 --- a/src/test/org/apache/hadoop/hbase/TestTimestamp.java +++ b/src/test/org/apache/hadoop/hbase/TestTimestamp.java @@ -18,13 +18,18 @@ package org.apache.hadoop.hbase; +import java.io.IOException; import java.util.TreeMap; + import org.apache.hadoop.io.Text; -/** Tests user specifyable time stamps */ -public class TestTimestamp extends HBaseClusterTestCase { +/** + * Tests user specifiable time stamps + */ +public class TestTimestamp extends HBaseTestCase { private static final long T0 = 10L; private static final long T1 = 100L; + private static final long T2 = 200L; private static final String COLUMN_NAME = "contents:"; private static final String TABLE_NAME = "test"; @@ -37,157 +42,226 @@ public class TestTimestamp extends HBaseClusterTestCase { }; private static final Text TABLE = new Text(TABLE_NAME); private static final Text ROW = new Text("row"); - - private HTable table; - - /** constructor */ - public TestTimestamp() { - super(); - } - - /** {@inheritDoc} */ - @Override - public void setUp() throws Exception { - super.setUp(); - - HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); - desc.addFamily(new HColumnDescriptor(COLUMN_NAME)); + /** + * Test that delete works according to description in hadoop-1784 + * when it comes to timestamps. + * @throws IOException + */ + public void testDelete() throws IOException { + HRegion r = createRegion(); try { - HBaseAdmin admin = new HBaseAdmin(conf); - admin.createTable(desc); - - } catch (Exception e) { - e.printStackTrace(); - fail(); + HRegionLoader loader = new HRegionLoader(r); + // Add a couple of values for three different timestamps. + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0); + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1); + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T2); + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad")); + // If I delete w/o specifying a timestamp, this means I'm deleting the + // latest. + delete(r, System.currentTimeMillis()); + // Verify that I get back T2 through T0. + } finally { + r.close(); + r.getLog().closeAndDelete(); } } - /** the test */ - public void testTimestamp() { + private void delete(final HRegion r, final long ts) throws IOException { + long lockid = r.startUpdate(ROW); + r.delete(lockid, COLUMN); + r.commit(lockid, ts == -1? System.currentTimeMillis(): ts); + } + + /** + * Test scanning against different timestamps. + * @throws IOException + */ + public void testTimestampScanning() throws IOException { + HRegion r = createRegion(); try { - table = new HTable(conf, TABLE); + HRegionLoader loader = new HRegionLoader(r); + // Add a couple of values for three different timestamps. + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0); + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1); + addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad")); + // Get count of latest items. + int count = assertScanContentTimestamp(r, System.currentTimeMillis()); + // Assert I get same count when I scan at each timestamp. + assertEquals(count, assertScanContentTimestamp(r, T0)); + assertEquals(count, assertScanContentTimestamp(r, T1)); + // Flush everything out to disk and then retry + r.flushcache(false); + assertEquals(count, assertScanContentTimestamp(r, T0)); + assertEquals(count, assertScanContentTimestamp(r, T1)); + } finally { + r.close(); + r.getLog().closeAndDelete(); + } + } + + /* + * Assert that the scan returns only values < timestamp. + * @param r + * @param ts + * @return Count of items scanned. + * @throws IOException + */ + private int assertScanContentTimestamp(final HRegion r, final long ts) + throws IOException { + int count = 0; + HInternalScannerInterface scanner = + r.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts, null); + try { + HStoreKey key = new HStoreKey(); + TreeMapvalue = new TreeMap(); + while (scanner.next(key, value)) { + assertTrue(key.getTimestamp() <= ts); + Text row = key.getRow(); + assertEquals(row.toString(), + new String(value.get(COLUMN), HConstants.UTF8_ENCODING)); + count++; + value.clear(); + } + } finally { + scanner.close(); + } + return count; + } + + /** + * Basic test of timestamps. + * TODO: Needs rewrite after hadoop-1784 gets fixed. + * @throws IOException + */ + public void testTimestamps() throws IOException { + MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1); + try { + HTable table = createTable(); // store a value specifying an update time - - long lockid = table.startUpdate(ROW); - table.put(lockid, COLUMN, VERSION1.getBytes(HConstants.UTF8_ENCODING)); - table.commit(lockid, T0); + put(table, VERSION1.getBytes(HConstants.UTF8_ENCODING), T0); // store a value specifying 'now' as the update time - - lockid = table.startUpdate(ROW); - table.put(lockid, COLUMN, LATEST.getBytes(HConstants.UTF8_ENCODING)); - table.commit(lockid); + put(table, LATEST.getBytes(HConstants.UTF8_ENCODING), -1); // delete values older than T1 - - lockid = table.startUpdate(ROW); + long lockid = table.startUpdate(ROW); table.delete(lockid, COLUMN); table.commit(lockid, T1); // now retrieve... - - // the most recent version: - - byte[] bytes = table.get(ROW, COLUMN); - assertTrue(bytes != null && bytes.length != 0); - assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING))); - - // any version <= time T1 - - byte[][] values = table.get(ROW, COLUMN, T1, 3); - assertNull(values); - - // the version from T0 - - values = table.get(ROW, COLUMN, T0, 3); - assertTrue(values.length == 1 - && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING))); + assertGets(table); // flush everything out to disk - HRegionServer s = cluster.regionThreads.get(0).getRegionServer(); for(HRegion r: s.onlineRegions.values() ) { r.flushcache(false); } // now retrieve... - - // the most recent version: - - bytes = table.get(ROW, COLUMN); - assertTrue(bytes != null && bytes.length != 0); - assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING))); - - // any version <= time T1 - - values = table.get(ROW, COLUMN, T1, 3); - assertNull(values); - - // the version from T0 - - values = table.get(ROW, COLUMN, T0, 3); - assertTrue(values.length == 1 - && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING))); - - // three versions older than now - - values = table.get(ROW, COLUMN, 3); - assertTrue(values.length == 1 - && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING))); + assertGets(table); // Test scanners - - HScannerInterface scanner = - table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW); - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - int count = 0; - while(scanner.next(key, results)) { - count++; - } - assertEquals(count, 1); - assertEquals(results.size(), 1); - - } finally { - scanner.close(); - } - - scanner = table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, T1); - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - int count = 0; - while(scanner.next(key, results)) { - count++; - } - assertEquals(count, 0); - assertEquals(results.size(), 0); - - } finally { - scanner.close(); - } - - scanner = table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, T0); - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - int count = 0; - while(scanner.next(key, results)) { - count++; - } - assertEquals(count, 0); - assertEquals(results.size(), 0); - - } finally { - scanner.close(); - } - + assertScanCount(table, -1, 1); + assertScanCount(table, T1, 0); } catch (Exception e) { - e.printStackTrace(); - fail(); + cluster.shutdown(); } } -} + + /* + * Test count of results scanning. + * @param table + * @param ts + * @param expectedCount + * @throws IOException + */ + private void assertScanCount(final HTable table, final long ts, + final int expectedCount) + throws IOException { + HScannerInterface scanner = (ts == -1)? + table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW): + table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + int count = 0; + while(scanner.next(key, results)) { + count++; + } + assertEquals(count, expectedCount); + assertEquals(results.size(), expectedCount); + + } finally { + scanner.close(); + } + } + + /* + * Test can do basic gets. + * Used by testTimestamp above. + * @param table + * @throws IOException + */ + private void assertGets(final HTable table) throws IOException { + // the most recent version: + byte[] bytes = table.get(ROW, COLUMN); + assertTrue(bytes != null && bytes.length != 0); + assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING))); + + // any version <= time T1 + byte[][] values = table.get(ROW, COLUMN, T1, 3); + assertNull(values); + + // the version from T0 + values = table.get(ROW, COLUMN, T0, 3); + assertTrue(values.length == 1 + && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING))); + + // three versions older than now + values = table.get(ROW, COLUMN, 3); + assertTrue(values.length == 1 + && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING))); + } + + /* + * Put values. + * @param table + * @param bytes + * @param ts + * @throws IOException + */ + private void put(final HTable table, final byte [] bytes, final long ts) + throws IOException { + long lockid = table.startUpdate(ROW); + table.put(lockid, COLUMN, bytes); + if (ts == -1) { + table.commit(lockid); + } else { + table.commit(lockid, ts); + } + } + + /* + * Create a table named TABLE_NAME. + * @return An instance of an HTable connected to the created table. + * @throws IOException + */ + private HTable createTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(COLUMN_NAME)); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + return new HTable(conf, TABLE); + } + + private HRegion createRegion() throws IOException { + HLog hlog = new HLog(this.localFs, this.testDir, this.conf); + HTableDescriptor htd = createTableDescriptor(getName()); + htd.addFamily(new HColumnDescriptor(COLUMN_NAME)); + HRegionInfo hri = new HRegionInfo(1, htd, null, null); + return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + } +} \ No newline at end of file