From 2cc1c9f65b136ae77eb79600e23e5b2b5fa8f0f0 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Mon, 10 Sep 2007 15:56:16 +0000 Subject: [PATCH] HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence of deletes. Added a deleteAll to remove all cells equal to or older than passed timestamp. Fixed compaction so deleted cells do not make it out into compacted output. Ensure also that versions > column max are dropped compacting. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@574287 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 5 + .../apache/hadoop/hbase/HAbstractScanner.java | 4 +- .../org/apache/hadoop/hbase/HConstants.java | 24 +- .../hbase/HInternalScannerInterface.java | 29 +- .../org/apache/hadoop/hbase/HMemcache.java | 95 ++++- src/java/org/apache/hadoop/hbase/HRegion.java | 282 ++++++++++++-- .../apache/hadoop/hbase/HRegionInterface.java | 22 +- .../apache/hadoop/hbase/HRegionServer.java | 64 +-- .../hadoop/hbase/HScannerInterface.java | 7 +- src/java/org/apache/hadoop/hbase/HStore.java | 255 +++++++++--- src/java/org/apache/hadoop/hbase/HTable.java | 78 +++- .../hadoop/hbase/io/BatchOperation.java | 69 ++-- .../apache/hadoop/hbase/io/BatchUpdate.java | 27 +- .../hbase/io/ImmutableBytesWritable.java | 3 + .../apache/hadoop/hbase/HBaseTestCase.java | 122 ++++-- .../apache/hadoop/hbase/MiniHBaseCluster.java | 11 + .../apache/hadoop/hbase/MultiRegionTable.java | 2 +- .../apache/hadoop/hbase/TestCompaction.java | 162 +++++--- .../apache/hadoop/hbase/TestMasterAdmin.java | 43 +-- .../org/apache/hadoop/hbase/TestScanner2.java | 45 ++- .../apache/hadoop/hbase/TestTimestamp.java | 365 ++++++++++-------- 21 files changed, 1208 insertions(+), 506 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 057cbc84475..fb4aa5a9516 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,11 @@ Trunk (unreleased changes) NEW FEATURES HADOOP-1768 FS command using Hadoop FsShell operations (Edward Yoon via Stack) + HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence + of deletes. Added a deleteAll to remove all cells equal to or + older than passed timestamp. Fixed compaction so deleted cells + do not make it out into compacted output. Ensure also that + versions > column max are dropped compacting. OPTIMIZATIONS diff --git a/src/java/org/apache/hadoop/hbase/HAbstractScanner.java b/src/java/org/apache/hadoop/hbase/HAbstractScanner.java index 84d68d2469a..cca5d8e0c2f 100644 --- a/src/java/org/apache/hadoop/hbase/HAbstractScanner.java +++ b/src/java/org/apache/hadoop/hbase/HAbstractScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; import java.util.regex.Pattern; @@ -205,7 +206,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { * * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap) */ - public boolean next(HStoreKey key, TreeMap results) + public boolean next(HStoreKey key, SortedMap results) throws IOException { // Find the next row label (and timestamp) Text chosenRow = null; @@ -218,7 +219,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { || (keys[i].getRow().compareTo(chosenRow) < 0) || ((keys[i].getRow().compareTo(chosenRow) == 0) && (keys[i].getTimestamp() > chosenTimestamp)))) { - chosenRow = new Text(keys[i].getRow()); chosenTimestamp = keys[i].getTimestamp(); } diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 127c8d91eb8..17a5fd53539 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -103,7 +103,7 @@ public interface HConstants { // be the first to be reassigned if the server(s) they are being served by // should go down. - /** The root table's name. */ + /** The root table's name.*/ static final Text ROOT_TABLE_NAME = new Text("-ROOT-"); /** The META table's name. */ @@ -139,10 +139,28 @@ public interface HConstants { static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB"); // Other constants - /** used by scanners, etc when they want to start at the beginning of a region */ - static final Text EMPTY_START_ROW = new Text(); + /** + * An empty instance of Text. + */ + static final Text EMPTY_TEXT = new Text(); + + /** + * Used by scanners, etc when they want to start at the beginning of a region + */ + static final Text EMPTY_START_ROW = EMPTY_TEXT; /** When we encode strings, we always specify UTF8 encoding */ static final String UTF8_ENCODING = "UTF-8"; + /** + * Timestamp to use when we want to refer to the latest cell. + * This is the timestamp sent by clients when no timestamp is specified on + * commit. + */ + static final long LATEST_TIMESTAMP = Long.MAX_VALUE; + + /** + * Define for 'return-all-versions'. + */ + static final int ALL_VERSIONS = -1; } diff --git a/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java b/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java index 021612078ac..6832c53e5a1 100644 --- a/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java +++ b/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java @@ -19,11 +19,6 @@ */ package org.apache.hadoop.hbase; -import java.io.IOException; -import java.util.TreeMap; - -import org.apache.hadoop.io.Text; - /** * Internally, we need to be able to determine if the scanner is doing wildcard * column matches (when only a column family is specified or if a column regex @@ -31,29 +26,7 @@ import org.apache.hadoop.io.Text; * specified. If so, we need to ignore the timestamp to ensure that we get all * the family members, as they may have been last updated at different times. */ -public interface HInternalScannerInterface { - - /** - * Grab the next row's worth of values. The HScanner will return the most - * recent data value for each row that is not newer than the target time. - * - * If a dataFilter is defined, it will be used to skip rows that do not - * match its criteria. It may cause the scanner to stop prematurely if it - * knows that it will no longer accept the remaining results. - * - * @param key HStoreKey containing row and timestamp - * @param results Map of column/value pairs - * @return true if a value was found - * @throws IOException - */ - public boolean next(HStoreKey key, TreeMap results) - throws IOException; - - /** - * Close the scanner. - */ - public void close(); - +public interface HInternalScannerInterface extends HScannerInterface { /** @return true if the scanner is matching a column family or regex */ public boolean isWildcardScanner(); diff --git a/src/java/org/apache/hadoop/hbase/HMemcache.java b/src/java/org/apache/hadoop/hbase/HMemcache.java index 47a4db442dc..9a3120c67ea 100644 --- a/src/java/org/apache/hadoop/hbase/HMemcache.java +++ b/src/java/org/apache/hadoop/hbase/HMemcache.java @@ -175,18 +175,15 @@ public class HMemcache { * @return An array of byte arrays ordered by timestamp. */ public byte [][] get(final HStoreKey key, final int numVersions) { - List results = new ArrayList(); this.lock.obtainReadLock(); try { - ArrayList result = - get(memcache, key, numVersions - results.size()); - results.addAll(0, result); + ArrayList results = get(memcache, key, numVersions); for (int i = history.size() - 1; i >= 0; i--) { if (numVersions > 0 && results.size() >= numVersions) { break; } - result = get(history.elementAt(i), key, numVersions - results.size()); - results.addAll(results.size(), result); + results.addAll(results.size(), + get(history.elementAt(i), key, numVersions - results.size())); } return (results.size() == 0)? null: ImmutableBytesWritable.toArray(results); @@ -194,7 +191,6 @@ public class HMemcache { this.lock.releaseReadLock(); } } - /** * Return all the available columns for the given key. The key indicates a @@ -248,7 +244,8 @@ public class HMemcache { * @param map * @param key * @param numVersions - * @return Ordered list of items found in passed map + * @return Ordered list of items found in passed map. If no + * matching values, returns an empty list (does not return null). */ ArrayList get(final TreeMap map, final HStoreKey key, final int numVersions) { @@ -261,15 +258,10 @@ public class HMemcache { for (Map.Entry es: tailMap.entrySet()) { HStoreKey itKey = es.getKey(); if (itKey.matchesRowCol(curKey)) { - if(HGlobals.deleteBytes.compareTo(es.getValue()) == 0) { - // TODO: Shouldn't this be a continue rather than a break? Perhaps - // the intent is that this DELETE_BYTES is meant to suppress older - // info -- see 5.4 Compactions in BigTable -- but how does this jibe - // with being able to remove one version only? - break; + if (!isDeleted(es.getValue())) { + result.add(tailMap.get(itKey)); + curKey.setVersion(itKey.getTimestamp() - 1); } - result.add(tailMap.get(itKey)); - curKey.setVersion(itKey.getTimestamp() - 1); } if (numVersions > 0 && result.size() >= numVersions) { break; @@ -278,6 +270,77 @@ public class HMemcache { return result; } + /** + * Get versions keys matching the origin key's + * row/column/timestamp and those of an older vintage + * Default access so can be accessed out of {@link HRegionServer}. + * @param origin Where to start searching. + * @param versions How many versions to return. Pass + * {@link HConstants.ALL_VERSIONS} to retrieve all. + * @return Ordered list of versions keys going from newest back. + * @throws IOException + */ + List getKeys(final HStoreKey origin, final int versions) { + this.lock.obtainReadLock(); + try { + List results = getKeys(this.memcache, origin, versions); + for (int i = history.size() - 1; i >= 0; i--) { + results.addAll(results.size(), getKeys(history.elementAt(i), origin, + versions == HConstants.ALL_VERSIONS? versions: + (results != null? versions - results.size(): versions))); + } + return results; + } finally { + this.lock.releaseReadLock(); + } + } + + /* + * @param origin Where to start searching. + * @param versions How many versions to return. Pass + * {@link HConstants.ALL_VERSIONS} to retrieve all. + * @return List of all keys that are of the same row and column and of + * equal or older timestamp. If no keys, returns an empty List. Does not + * return null. + */ + private List getKeys(final TreeMap map, + final HStoreKey origin, final int versions) { + List result = new ArrayList(); + SortedMap tailMap = map.tailMap(origin); + for (Map.Entry es: tailMap.entrySet()) { + HStoreKey key = es.getKey(); + if (!key.matchesRowCol(origin)) { + break; + } + if (!isDeleted(es.getValue())) { + result.add(key); + if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) { + // We have enough results. Return. + break; + } + } + } + return result; + } + + /** + * @param key + * @return True if an entry and its content is {@link HGlobals.deleteBytes}. + * Use checking values in store. On occasion the memcache has the fact that + * the cell has been deleted. + */ + boolean isDeleted(final HStoreKey key) { + return isDeleted(this.memcache.get(key)); + } + + /** + * @param value + * @return True if an entry and its content is {@link HGlobals.deleteBytes}. + */ + boolean isDeleted(final byte [] value) { + return (value == null)? false: HGlobals.deleteBytes.compareTo(value) == 0; + } + /** * Return a scanner over the keys in the HMemcache */ diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index a4edfcb42e9..fcce0574cef 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; import java.util.Vector; @@ -581,6 +582,9 @@ public class HRegion implements HConstants { lock.obtainReadLock(); try { HStore.HStoreSize biggest = largestHStore(midKey); + if (biggest == null) { + return false; + } long triggerSize = this.desiredMaxFileSize + (this.desiredMaxFileSize / 2); boolean split = (biggest.getAggregate() >= triggerSize); @@ -911,26 +915,47 @@ public class HRegion implements HConstants { } } - /** Private implementation: get the value for the indicated HStoreKey */ - private byte [][] get(HStoreKey key, int numVersions) throws IOException { - + private byte [][] get(final HStoreKey key, final int numVersions) + throws IOException { lock.obtainReadLock(); try { // Check the memcache - byte [][] result = memcache.get(key, numVersions); - if(result != null) { - return result; + byte [][] memcacheResult = this.memcache.get(key, numVersions); + // If we got sufficient versions from memcache, return. + if (memcacheResult != null && memcacheResult.length == numVersions) { + return memcacheResult; } - // If unavailable in memcache, check the appropriate HStore + // Check hstore for more versions. Text colFamily = HStoreKey.extractFamily(key.getColumn()); HStore targetStore = stores.get(colFamily); if(targetStore == null) { - return null; + // There are no stores. Return what we got from memcache. + return memcacheResult; } - - return targetStore.get(key, numVersions); + // Update the number of versions we need to fetch from the store. + int amendedNumVersions = numVersions; + if (memcacheResult != null) { + amendedNumVersions -= memcacheResult.length; + } + byte [][] result = + targetStore.get(key, amendedNumVersions, this.memcache); + if (result == null) { + result = memcacheResult; + } else if (memcacheResult != null) { + // We have results from both memcache and from stores. Put them + // together in an array in the proper order. + byte [][] storeResult = result; + result = new byte [memcacheResult.length + result.length][]; + for (int i = 0; i < memcacheResult.length; i++) { + result[i] = memcacheResult[i]; + } + for (int i = 0; i < storeResult.length; i++) { + result[i + memcacheResult.length] = storeResult[i]; + } + } + return result; } finally { lock.releaseReadLock(); } @@ -962,6 +987,45 @@ public class HRegion implements HConstants { } } + /** + * Get all keys matching the origin key's row/column/timestamp and those + * of an older vintage + * Default access so can be accessed out of {@link HRegionServer}. + * @param origin Where to start searching. + * @return Ordered list of keys going from newest on back. + * @throws IOException + */ + List getKeys(final HStoreKey origin) throws IOException { + return getKeys(origin, ALL_VERSIONS); + } + + /** + * Get versions keys matching the origin key's + * row/column/timestamp and those of an older vintage + * Default access so can be accessed out of {@link HRegionServer}. + * @param origin Where to start searching. + * @param versions How many versions to return. Pass + * {@link HConstants.ALL_VERSIONS} to retrieve all. + * @return Ordered list of versions keys going from newest back. + * @throws IOException + */ + List getKeys(final HStoreKey origin, final int versions) + throws IOException { + List keys = this.memcache.getKeys(origin, versions); + if (versions != ALL_VERSIONS && keys.size() >= versions) { + return keys; + } + // Check hstore for more versions. + Text colFamily = HStoreKey.extractFamily(origin.getColumn()); + HStore targetStore = stores.get(colFamily); + if (targetStore != null) { + // Pass versions without modification since in the store getKeys, it + // includes the size of the passed keys array when counting. + keys = targetStore.getKeys(origin, keys, versions); + } + return keys; + } + /** * Return an iterator that scans over the HRegion, returning the indicated * columns for only the rows that match the data filter. This Iterator must be closed by the caller. @@ -1110,8 +1174,8 @@ public class HRegion implements HConstants { } /** - * Delete a value or write a value. This is a just a convenience method for put(). - * + * Delete a value or write a value. + * This is a just a convenience method for put(). * @param lockid lock id obtained from startUpdate * @param targetCol name of column to be deleted * @throws IOException @@ -1119,6 +1183,51 @@ public class HRegion implements HConstants { public void delete(long lockid, Text targetCol) throws IOException { localput(lockid, targetCol, HGlobals.deleteBytes.get()); } + + /** + * Delete all cells of the same age as the passed timestamp or older. + * @param row + * @param column + * @param ts Delete all entries that have this timestamp or older + * @throws IOException + */ + public void deleteAll(final Text row, final Text column, final long ts) + throws IOException { + deleteMultiple(row, column, ts, ALL_VERSIONS); + } + + /** + * Delete one or many cells. + * Used to support {@link #deleteAll(Text, Text, long)} and deletion of + * latest cell. + * @param row + * @param column + * @param ts Timestamp to start search on. + * @param versions How many versions to delete. Pass + * {@link HConstants.ALL_VERSIONS} to delete all. + * @throws IOException + */ + void deleteMultiple(final Text row, final Text column, final long ts, + final int versions) + throws IOException { + lock.obtainReadLock(); + try { + checkColumn(column); + HStoreKey origin = new HStoreKey(row, column, ts); + synchronized(row) { + List keys = getKeys(origin, versions); + if (keys.size() > 0) { + TreeMap edits = new TreeMap(); + edits.put(column, HGlobals.deleteBytes.get()); + for (HStoreKey key: keys) { + update(row, key.getTimestamp(), edits); + } + } + } + } finally { + lock.releaseReadLock(); + } + } /** * Private implementation. @@ -1202,10 +1311,11 @@ public class HRegion implements HConstants { * Once updates hit the change log, they are safe. They will either be moved * into an HStore in the future, or they will be recovered from the log. * @param lockid Lock for row we're to commit. - * @param timestamp the time to associate with this change + * @param timestamp the time to associate with this change. * @throws IOException */ - public void commit(final long lockid, long timestamp) throws IOException { + public void commit(final long lockid, final long timestamp) + throws IOException { // Remove the row from the pendingWrites list so // that repeated executions won't screw this up. Text row = getRowFromLock(lockid); @@ -1216,19 +1326,75 @@ public class HRegion implements HConstants { // This check makes sure that another thread from the client // hasn't aborted/committed the write-operation synchronized(row) { - // Add updates to the log and add values to the memcache. Long lid = Long.valueOf(lockid); - TreeMap columns = this.targetColumns.get(lid); - if (columns != null && columns.size() > 0) { - log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), - row, columns, timestamp); - memcache.add(row, columns, timestamp); - // OK, all done! - } + update(row, timestamp, this.targetColumns.get(lid)); targetColumns.remove(lid); releaseRowLock(row); } } + + /** + * This method for unit testing only. + * Does each operation individually so can do appropriate + * {@link HConstants#LATEST_TIMESTAMP} action. Tries to mimic how + * {@link HRegionServer#batchUpdate(Text, long, org.apache.hadoop.hbase.io.BatchUpdate)} + * works when passed a timestamp of LATEST_TIMESTAMP. + * @param lockid Lock for row we're to commit. + * @throws IOException + * @throws IOException + * @see {@link #commit(long, long)} + */ + void commit(final long lockid) throws IOException { + // Remove the row from the pendingWrites list so + // that repeated executions won't screw this up. + Text row = getRowFromLock(lockid); + if(row == null) { + throw new LockException("No write lock for lockid " + lockid); + } + + // This check makes sure that another thread from the client + // hasn't aborted/committed the write-operation + synchronized(row) { + Long lid = Long.valueOf(lockid); + TreeMap updatesByColumn = this.targetColumns.get(lid); + // Run updates one at a time so we can supply appropriate timestamp + long now = System.currentTimeMillis(); + for (Map.Entrye: updatesByColumn.entrySet()) { + if (HGlobals.deleteBytes.equals(e.getValue())) { + // Its a delete. Delete latest. deleteMultiple calls update for us. + // Actually regets the row lock but since we already have it, should + // be fine. + deleteMultiple(row, e.getKey(), LATEST_TIMESTAMP, 1); + continue; + } + // Must be a 'put'. + TreeMap putEdit = new TreeMap(); + putEdit.put(e.getKey(), e.getValue()); + update(row, now, putEdit); + } + this.targetColumns.remove(lid); + releaseRowLock(row); + } + } + + /* + * Add updates to the log and add values to the memcache. + * Warning: Assumption is caller has lock on passed in row. + * @param row Row to update. + * @param timestamp Timestamp to record the updates against + * @param updatesByColumn Cell updates by column + * @throws IOException + */ + private void update(final Text row, final long timestamp, + final TreeMap updatesByColumn) + throws IOException { + if (updatesByColumn == null || updatesByColumn.size() <= 0) { + return; + } + this.log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), + row, updatesByColumn, timestamp); + this.memcache.add(row, updatesByColumn, timestamp); + } ////////////////////////////////////////////////////////////////////////////// // Support code @@ -1250,7 +1416,11 @@ public class HRegion implements HConstants { } } - /** Make sure this is a valid column for the current table */ + /** + * Make sure this is a valid column for the current table + * @param columnName + * @throws IOException + */ void checkColumn(Text columnName) throws IOException { Text family = new Text(HStoreKey.extractFamily(columnName) + ":"); if(! regionInfo.tableDesc.hasFamily(family)) { @@ -1359,10 +1529,6 @@ public class HRegion implements HConstants { dataFilter.reset(); } this.scanners = new HInternalScannerInterface[stores.length + 1]; - for(int i = 0; i < this.scanners.length; i++) { - this.scanners[i] = null; - } - this.resultSets = new TreeMap[scanners.length]; this.keys = new HStoreKey[scanners.length]; this.wildcardMatch = false; @@ -1424,12 +1590,11 @@ public class HRegion implements HConstants { public boolean isMultipleMatchScanner() { return multipleMatchers; } - - /** - * {@inheritDoc} - */ - public boolean next(HStoreKey key, TreeMap results) + + public boolean next(HStoreKey key, SortedMap results) throws IOException { + // Filtered flag is set by filters. If a cell has been 'filtered out' + // -- i.e. it is not to be returned to the caller -- the flag is 'true'. boolean filtered = true; boolean moreToFollow = true; while (filtered && moreToFollow) { @@ -1446,19 +1611,27 @@ public class HRegion implements HConstants { chosenTimestamp = keys[i].getTimestamp(); } } - + // Filter whole row by row key? filtered = dataFilter != null? dataFilter.filter(chosenRow) : false; // Store the key and results for each sub-scanner. Merge them as // appropriate. - if (chosenTimestamp > 0 && !filtered) { + if (chosenTimestamp >= 0 && !filtered) { + // Here we are setting the passed in key with current row+timestamp key.setRow(chosenRow); key.setVersion(chosenTimestamp); - key.setColumn(new Text("")); - + key.setColumn(HConstants.EMPTY_TEXT); + // Keep list of deleted cell keys within this row. We need this + // because as we go through scanners, the delete record may be in an + // early scanner and then the same record with a non-delete, non-null + // value in a later. Without history of what we've seen, we'll return + // deleted values. This List should not ever grow too large since we + // are only keeping rows and columns that match those set on the + // scanner and which have delete values. If memory usage becomes a + // problem, could redo as bloom filter. + List deletes = new ArrayList(); for (int i = 0; i < scanners.length && !filtered; i++) { - while ((scanners[i] != null && !filtered && moreToFollow) @@ -1481,8 +1654,19 @@ public class HRegion implements HConstants { // but this had the effect of overwriting newer // values with older ones. So now we only insert // a result if the map does not contain the key. + HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT, + key.getTimestamp()); for (Map.Entry e : resultSets[i].entrySet()) { - if (!filtered && moreToFollow && + hsk.setColumn(e.getKey()); + if (HGlobals.deleteBytes.equals(e.getValue())) { + if (!deletes.contains(hsk)) { + // Key changes as we cycle the for loop so add a copy to + // the set of deletes. + deletes.add(new HStoreKey(hsk)); + } + } else if (!deletes.contains(hsk) && + !filtered && + moreToFollow && !results.containsKey(e.getKey())) { if (dataFilter != null) { // Filter whole row by column data? @@ -1496,7 +1680,6 @@ public class HRegion implements HConstants { results.put(e.getKey(), e.getValue()); } } - resultSets[i].clear(); if (!scanners[i].next(keys[i], resultSets[i])) { closeScanner(i); @@ -1516,8 +1699,8 @@ public class HRegion implements HConstants { } } } - - moreToFollow = chosenTimestamp > 0; + + moreToFollow = chosenTimestamp >= 0; if (dataFilter != null) { if (moreToFollow) { @@ -1533,6 +1716,17 @@ public class HRegion implements HConstants { LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered); } } + + if (results.size() <= 0 && !filtered) { + // There were no results found for this row. Marked it as + // 'filtered'-out otherwise we will not move on to the next row. + filtered = true; + } + } + + // If we got no results, then there is no more to follow. + if (results == null || results.size() <= 0) { + moreToFollow = false; } // Make sure scanners closed if no more results @@ -1551,7 +1745,11 @@ public class HRegion implements HConstants { /** Shut down a single scanner */ void closeScanner(int i) { try { - scanners[i].close(); + try { + scanners[i].close(); + } catch (IOException e) { + LOG.warn("Failed closeing scanner " + i, e); + } } finally { scanners[i] = null; keys[i] = null; diff --git a/src/java/org/apache/hadoop/hbase/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/HRegionInterface.java index 44b7789cb69..caaff45cc55 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInterface.java @@ -38,8 +38,8 @@ public interface HRegionInterface extends VersionedProtocol { /** * Get metainfo about an HRegion * - * @param regionName - name of the region - * @return - HRegionInfo object for region + * @param regionName name of the region + * @return HRegionInfo object for region * @throws NotServingRegionException */ public HRegionInfo getRegionInfo(final Text regionName) @@ -69,7 +69,7 @@ public interface HRegionInterface extends VersionedProtocol { * @throws IOException */ public byte [][] get(final Text regionName, final Text row, - final Text column, final int numVersions) + final Text column, final int numVersions) throws IOException; /** @@ -107,7 +107,21 @@ public interface HRegionInterface extends VersionedProtocol { * @param b BatchUpdate * @throws IOException */ - public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException; + public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) + throws IOException; + + /** + * Delete all cells that match the passed row and column and whose + * timestamp is equal-to or older than the passed timestamp. + * + * @param regionName region name + * @param row row key + * @param column column key + * @param timestamp Delete all entries that have this timestamp or older + * @throws IOException + */ + public void deleteAll(Text regionName, Text row, Text column, long timestamp) + throws IOException; // // remote scanner interface diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 48c6b70a8b4..8ea200f0291 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.SortedMap; @@ -1075,22 +1076,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { leases.renewLease(scannerId, scannerId); // Collect values to be returned here - MapWritable values = new MapWritable(); - - // Keep getting rows until we find one that has at least one non-deleted column value - HStoreKey key = new HStoreKey(); TreeMap results = new TreeMap(); while (s.next(key, results)) { for(Map.Entry e: results.entrySet()) { - HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()); - byte [] val = e.getValue(); - if (HGlobals.deleteBytes.compareTo(val) == 0) { - // Column value is deleted. Don't return it. - continue; - } - values.put(k, new ImmutableBytesWritable(val)); + values.put(new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()), + new ImmutableBytesWritable(e.getValue())); } if(values.size() > 0) { @@ -1099,7 +1091,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } // No data for this row, go get another. - results.clear(); } return values; @@ -1110,26 +1101,46 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } } - /** {@inheritDoc} */ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) - throws IOException { - + throws IOException { requestCount.incrementAndGet(); + // If timestamp == LATEST_TIMESTAMP and we have deletes, then they need + // special treatment. For these we need to first find the latest cell so + // when we write the delete, we write it with the latest cells' timestamp + // so the delete record overshadows. This means deletes and puts do not + // happen within the same row lock. + List deletes = null; try { long lockid = startUpdate(regionName, b.getRow()); for(BatchOperation op: b) { switch(op.getOp()) { - case BatchOperation.PUT_OP: + case PUT: put(regionName, lockid, op.getColumn(), op.getValue()); break; - case BatchOperation.DELETE_OP: - delete(regionName, lockid, op.getColumn()); + case DELETE: + if (timestamp == LATEST_TIMESTAMP) { + // Save off these deletes. + if (deletes == null) { + deletes = new ArrayList(); + } + deletes.add(op.getColumn()); + } else { + delete(regionName, lockid, op.getColumn()); + } break; } } - commit(regionName, lockid, timestamp); + commit(regionName, lockid, + (timestamp == LATEST_TIMESTAMP)? System.currentTimeMillis(): timestamp); + if (deletes != null && deletes.size() > 0) { + // We have some LATEST_TIMESTAMP deletes to run. + HRegion r = getRegion(regionName); + for (Text column: deletes) { + r.deleteMultiple(b.getRow(), column, LATEST_TIMESTAMP, 1); + } + } } catch (IOException e) { checkFileSystem(); throw e; @@ -1158,7 +1169,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } leases.createLease(scannerId, scannerId, new ScannerListener(scannerName)); return scannerId; - } catch (IOException e) { if (e instanceof RemoteException) { try { @@ -1217,7 +1227,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { s = scanners.remove(this.scannerName); } if (s != null) { - s.close(); + try { + s.close(); + } catch (IOException e) { + LOG.error("Closing scanner", e); + } } } } @@ -1241,10 +1255,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { protected void delete(Text regionName, long lockid, Text column) throws IOException { - HRegion region = getRegion(regionName); region.delete(lockid, column); } + + public void deleteAll(final Text regionName, final Text row, + final Text column, final long timestamp) + throws IOException { + HRegion region = getRegion(regionName); + region.deleteAll(row, column, timestamp); + } protected void commit(Text regionName, final long lockid, final long timestamp) throws IOException { diff --git a/src/java/org/apache/hadoop/hbase/HScannerInterface.java b/src/java/org/apache/hadoop/hbase/HScannerInterface.java index 2a0f9b499ce..605c540ab57 100644 --- a/src/java/org/apache/hadoop/hbase/HScannerInterface.java +++ b/src/java/org/apache/hadoop/hbase/HScannerInterface.java @@ -30,9 +30,12 @@ import java.util.SortedMap; */ public interface HScannerInterface { /** - * Get the next set of values + * Grab the next row's worth of values. The scanner will return the most + * recent data value for each row that is not newer than the target time + * passed when the scanner was created. * @param key will contain the row and timestamp upon return - * @param results will contain an entry for each column family member and its value + * @param results will contain an entry for each column family member and its + * value * @return true if data was returned * @throws IOException */ diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index 3907672182b..5236bc2bbcd 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -24,6 +24,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -86,7 +89,14 @@ class HStore implements HConstants { final HLocking lock = new HLocking(); + /* Sorted Map of readers keyed by sequence id (Most recent should be last in + * in list). + */ TreeMap storefiles = new TreeMap(); + + /* Sorted Map of readers keyed by sequence id (Most recent should be last in + * in list). + */ TreeMap readers = new TreeMap(); Random rand = new Random(); @@ -176,7 +186,7 @@ class HStore implements HConstants { // MapFiles are in a reliable state. Every entry in 'mapdir' must have a // corresponding one in 'loginfodir'. Without a corresponding log info // file, the entry in 'mapdir' must be deleted. - Vector hstoreFiles + Collection hstoreFiles = HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs); for(HStoreFile hsf: hstoreFiles) { this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf); @@ -446,30 +456,23 @@ class HStore implements HConstants { MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, this.bloomFilter); - // hbase.hstore.compact.on.flush=true enables picking up an existing - // HStoreFIle from disk interlacing the memcache flush compacting as we - // go. The notion is that interlacing would take as long as a pure - // flush with the added benefit of having one less file in the store. - // Experiments show that it takes two to three times the amount of time - // flushing -- more column families makes it so the two timings come - // closer together -- but it also complicates the flush. Disabled for - // now. Needs work picking which file to interlace (favor references - // first, etc.) + // Here we tried picking up an existing HStoreFile from disk and + // interlacing the memcache flush compacting as we go. The notion was + // that interlacing would take as long as a pure flush with the added + // benefit of having one less file in the store. Experiments showed that + // it takes two to three times the amount of time flushing -- more column + // families makes it so the two timings come closer together -- but it + // also complicates the flush. The code was removed. Needed work picking + // which file to interlace (favor references first, etc.) // // Related, looks like 'merging compactions' in BigTable paper interlaces // a memcache flush. We don't. try { - if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) && - this.storefiles.size() > 0) { - compact(out, inputCache.entrySet().iterator(), - this.readers.get(this.storefiles.firstKey())); - } else { - for (Map.Entry es: inputCache.entrySet()) { - HStoreKey curkey = es.getKey(); - if (this.familyName. - equals(HStoreKey.extractFamily(curkey.getColumn()))) { - out.append(curkey, new ImmutableBytesWritable(es.getValue())); - } + for (Map.Entry es: inputCache.entrySet()) { + HStoreKey curkey = es.getKey(); + if (this.familyName. + equals(HStoreKey.extractFamily(curkey.getColumn()))) { + out.append(curkey, new ImmutableBytesWritable(es.getValue())); } } } finally { @@ -546,7 +549,6 @@ class HStore implements HConstants { * * We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. - * * @throws IOException */ void compact() throws IOException { @@ -564,6 +566,8 @@ class HStore implements HConstants { * @param maxSeenSeqID We may have already calculated the maxSeenSeqID. If * so, pass it here. Otherwise, pass -1 and it will be calculated inside in * this method. + * @param deleteSequenceInfo + * @param maxSeenSeqID * @throws IOException */ void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID) @@ -584,7 +588,7 @@ class HStore implements HConstants { } } try { - Vector toCompactFiles = getFilesToCompact(); + List toCompactFiles = getFilesToCompact(); HStoreFile compactedOutputFile = new HStoreFile(conf, this.compactionDir, regionName, familyName, -1); if (toCompactFiles.size() < 1 || @@ -664,17 +668,21 @@ class HStore implements HConstants { } /* - * @return list of files to compact + * @return list of files to compact sorted so most recent comes first. */ - private Vector getFilesToCompact() { - Vector toCompactFiles = null; + private List getFilesToCompact() { + List filesToCompact = null; this.lock.obtainWriteLock(); try { - toCompactFiles = new Vector(storefiles.values()); + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file as + // first. + filesToCompact = new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); } finally { this.lock.releaseWriteLock(); } - return toCompactFiles; + return filesToCompact; } /* @@ -694,7 +702,7 @@ class HStore implements HConstants { * @throws IOException */ void compact(final MapFile.Writer compactedOut, - final Vector toCompactFiles) + final List toCompactFiles) throws IOException { int size = toCompactFiles.size(); CompactionReader[] rdrs = new CompactionReader[size]; @@ -842,8 +850,14 @@ class HStore implements HConstants { int timesSeen = 0; Text lastRow = new Text(); Text lastColumn = new Text(); - while(numDone < done.length) { - // Find the reader with the smallest key + // Map of a row deletes keyed by column with a list of timestamps for value + Map> deletes = null; + while (numDone < done.length) { + // Find the reader with the smallest key. If two files have same key + // but different values -- i.e. one is delete and other is non-delete + // value -- we will find the first, the one that was written later and + // therefore the one whose value should make it out to the compacted + // store file. int smallestKey = -1; for(int i = 0; i < rdrs.length; i++) { if(done[i]) { @@ -865,24 +879,23 @@ class HStore implements HConstants { timesSeen++; } else { timesSeen = 1; + // We are on to a new row. Create a new deletes list. + deletes = new HashMap>(); } - if(timesSeen <= family.getMaxVersions()) { + byte [] value = (vals[smallestKey] == null)? + null: vals[smallestKey].get(); + if (!isDeleted(sk, value, null, deletes) && + timesSeen <= family.getMaxVersions()) { // Keep old versions until we have maxVersions worth. // Then just skip them. - if(sk.getRow().getLength() != 0 - && sk.getColumn().getLength() != 0) { + if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) { // Only write out objects which have a non-zero length key and // value compactedOut.append(sk, vals[smallestKey]); } } - // TODO: I don't know what to do about deleted values. I currently - // include the fact that the item was deleted as a legitimate - // "version" of the data. Maybe it should just drop the deleted - // val? - // Update last-seen items lastRow.set(sk.getRow()); lastColumn.set(sk.getColumn()); @@ -899,6 +912,52 @@ class HStore implements HConstants { } } + /* + * Check if this is cell is deleted. + * If a memcache and a deletes, check key does not have an entry filled. + * Otherwise, check value is not the HGlobals.deleteBytes value. + * If passed value IS deleteBytes, then it is added to the passed + * deletes map. + * @param hsk + * @param value + * @param memcache Can be null. + * @param deletes Map keyed by column with a value of timestamp. Can be null. + * If non-null and passed value is HGlobals.deleteBytes, then we add to this + * map. + * @return True if this is a deleted cell. Adds the passed deletes map if + * passed value is HGlobals.deleteBytes. + */ + private boolean isDeleted(final HStoreKey hsk, final byte [] value, + final HMemcache memcache, final Map> deletes) { + if (memcache != null && memcache.isDeleted(hsk)) { + return true; + } + List timestamps = (deletes == null)? + null: deletes.get(hsk.getColumn()); + if (timestamps != null && + timestamps.contains(Long.valueOf(hsk.getTimestamp()))) { + return true; + } + if (value == null) { + // If a null value, shouldn't be in here. Mark it as deleted cell. + return true; + } + if (!HGlobals.deleteBytes.equals(value)) { + return false; + } + // Cell has delete value. Save it into deletes. + if (deletes != null) { + if (timestamps == null) { + timestamps = new ArrayList(); + deletes.put(hsk.getColumn(), timestamps); + } + // We know its not already in the deletes array else we'd have returned + // earlier so no need to test if timestamps already has this value. + timestamps.add(Long.valueOf(hsk.getTimestamp())); + } + return true; + } + /* * It's assumed that the compactLock will be acquired prior to calling this * method! Otherwise, it is not thread-safe! @@ -1061,22 +1120,37 @@ class HStore implements HConstants { * previous 'numVersions-1' values, as well. * * If 'numVersions' is negative, the method returns all available versions. + * @param key + * @param numVersions Number of versions to fetch. Must be > 0. + * @param memcache Checked for deletions + * @return + * @throws IOException */ - byte [][] get(HStoreKey key, int numVersions) throws IOException { + byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache) + throws IOException { if (numVersions <= 0) { throw new IllegalArgumentException("Number of versions must be > 0"); } List results = new ArrayList(); + // Keep a list of deleted cell keys. We need this because as we go through + // the store files, the cell with the delete marker may be in one file and + // the old non-delete cell value in a later store file. If we don't keep + // around the fact that the cell was deleted in a newer record, we end up + // returning the old value if user is asking for more than one version. + // This List of deletes should not large since we are only keeping rows + // and columns that match those set on the scanner and which have delete + // values. If memory usage becomes an issue, could redo as bloom filter. + Map> deletes = new HashMap>(); + // This code below is very close to the body of the getKeys method. this.lock.obtainReadLock(); try { MapFile.Reader[] maparray = getReaders(); for(int i = maparray.length - 1; i >= 0; i--) { MapFile.Reader map = maparray[i]; - synchronized(map) { - ImmutableBytesWritable readval = new ImmutableBytesWritable(); map.reset(); + ImmutableBytesWritable readval = new ImmutableBytesWritable(); HStoreKey readkey = (HStoreKey)map.getClosest(key, readval); if (readkey == null) { // map.getClosest returns null if the passed key is > than the @@ -1085,27 +1159,31 @@ class HStore implements HConstants { // BEFORE. continue; } - if (readkey.matchesRowCol(key)) { - if(readval.equals(HGlobals.deleteBytes)) { + if (!readkey.matchesRowCol(key)) { + continue; + } + if (!isDeleted(readkey, readval.get(), memcache, deletes)) { + results.add(readval.get()); + // Perhaps only one version is wanted. I could let this + // test happen later in the for loop test but it would cost + // the allocation of an ImmutableBytesWritable. + if (hasEnoughVersions(numVersions, results)) { break; } - results.add(readval.get()); - readval = new ImmutableBytesWritable(); - while(map.next(readkey, readval) && readkey.matchesRowCol(key)) { - if ((numVersions > 0 && (results.size() >= numVersions)) - || readval.equals(HGlobals.deleteBytes)) { - break; - } + } + while ((readval = new ImmutableBytesWritable()) != null && + map.next(readkey, readval) && + readkey.matchesRowCol(key) && + !hasEnoughVersions(numVersions, results)) { + if (!isDeleted(readkey, readval.get(), memcache, deletes)) { results.add(readval.get()); - readval = new ImmutableBytesWritable(); } } } - if(results.size() >= numVersions) { + if (hasEnoughVersions(numVersions, results)) { break; } } - return results.size() == 0 ? null : ImmutableBytesWritable.toArray(results); } finally { @@ -1113,6 +1191,75 @@ class HStore implements HConstants { } } + private boolean hasEnoughVersions(final int numVersions, + final List results) { + return numVersions > 0 && results.size() >= numVersions; + } + + /** + * Get versions keys matching the origin key's + * row/column/timestamp and those of an older vintage + * Default access so can be accessed out of {@link HRegionServer}. + * @param origin Where to start searching. + * @param versions How many versions to return. Pass + * {@link HConstants.ALL_VERSIONS} to retrieve all. Versions will include + * size of passed allKeys in its count. + * @param allKeys List of keys prepopulated by keys we found in memcache. + * This method returns this passed list with all matching keys found in + * stores appended. + * @return The passed allKeys with versions of + * matching keys found in store files appended. + * @throws IOException + */ + List getKeys(final HStoreKey origin, List allKeys, + final int versions) + throws IOException { + if (allKeys == null) { + allKeys = new ArrayList(); + } + // This code below is very close to the body of the get method. + this.lock.obtainReadLock(); + try { + MapFile.Reader[] maparray = getReaders(); + for(int i = maparray.length - 1; i >= 0; i--) { + MapFile.Reader map = maparray[i]; + synchronized(map) { + map.reset(); + ImmutableBytesWritable readval = new ImmutableBytesWritable(); + HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval); + if (readkey == null) { + // map.getClosest returns null if the passed key is > than the + // last key in the map file. getClosest is a bit of a misnomer + // since it returns exact match or the next closest key AFTER not + // BEFORE. + continue; + } + if (!readkey.matchesRowCol(origin)) { + continue; + } + if (!isDeleted(readkey, readval.get(), null, null) && + !allKeys.contains(readkey)) { + allKeys.add(new HStoreKey(readkey)); + } + while ((readval = new ImmutableBytesWritable()) != null && + map.next(readkey, readval) && + readkey.matchesRowCol(origin)) { + if (!isDeleted(readkey, readval.get(), null, null) && + !allKeys.contains(readkey)) { + allKeys.add(new HStoreKey(readkey)); + if (versions != ALL_VERSIONS && allKeys.size() >= versions) { + break; + } + } + } + } + } + return allKeys; + } finally { + this.lock.releaseReadLock(); + } + } + /* * Data structure to hold result of a look at store file sizes. */ diff --git a/src/java/org/apache/hadoop/hbase/HTable.java b/src/java/org/apache/hadoop/hbase/HTable.java index 65cb1e4c7a2..aa6cd65359e 100644 --- a/src/java/org/apache/hadoop/hbase/HTable.java +++ b/src/java/org/apache/hadoop/hbase/HTable.java @@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -531,10 +530,11 @@ public class HTable implements HConstants { } /** - * Delete the value for a column - * - * @param lockid - lock id returned from startUpdate - * @param column - name of column whose value is to be deleted + * Delete the value for a column. + * Deletes the cell whose row/column/commit-timestamp match those of the + * delete. + * @param lockid lock id returned from startUpdate + * @param column name of column whose value is to be deleted */ public void delete(long lockid, Text column) { checkClosed(); @@ -542,10 +542,60 @@ public class HTable implements HConstants { batch.get().delete(lockid, column); } + /** + * Delete all values for a column + * + * @param row Row to update + * @param column name of column whose value is to be deleted + * @throws IOException + */ + public void deleteAll(final Text row, final Text column) throws IOException { + deleteAll(row, column, LATEST_TIMESTAMP); + } + + /** + * Delete all values for a column + * + * @param row Row to update + * @param column name of column whose value is to be deleted + * @param ts Delete all cells of the same timestamp or older. + * @throws IOException + */ + public void deleteAll(final Text row, final Text column, final long ts) + throws IOException { + checkClosed(); + for(int tries = 0; tries < numRetries; tries++) { + HRegionLocation r = getRegionLocation(row); + HRegionInterface server = + connection.getHRegionConnection(r.getServerAddress()); + try { + server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts); + break; + + } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + if (tries == numRetries - 1) { + throw e; + } + if (LOG.isDebugEnabled()) { + LOG.debug("reloading table servers because: " + e.getMessage()); + } + tableServers = connection.reloadTableServers(tableName); + } + try { + Thread.sleep(this.pause); + } catch (InterruptedException x) { + // continue + } + } + } + /** * Abort a row mutation * - * @param lockid - lock id returned from startUpdate + * @param lockid lock id returned from startUpdate */ public synchronized void abort(long lockid) { checkClosed(); @@ -558,24 +608,26 @@ public class HTable implements HConstants { /** * Finalize a row mutation - * - * @param lockid - lock id returned from startUpdate + * When this method is specified, we pass the server a value that says use + * the 'latest' timestamp. If we are doing a put, on the server-side, cells + * will be given the servers's current timestamp. If the we are commiting + * deletes, then delete removes the most recently modified cell of stipulated + * column. + * @param lockid lock id returned from startUpdate * @throws IOException */ public void commit(long lockid) throws IOException { - commit(lockid, System.currentTimeMillis()); + commit(lockid, LATEST_TIMESTAMP); } /** * Finalize a row mutation - * - * @param lockid - lock id returned from startUpdate - * @param timestamp - time to associate with the change + * @param lockid lock id returned from startUpdate + * @param timestamp time to associate with the change * @throws IOException */ public synchronized void commit(long lockid, long timestamp) throws IOException { - checkClosed(); updateInProgress(true); if (batch.get().getLockid() != lockid) { diff --git a/src/java/org/apache/hadoop/hbase/io/BatchOperation.java b/src/java/org/apache/hadoop/hbase/io/BatchOperation.java index e20009d7959..8e0a934d737 100644 --- a/src/java/org/apache/hadoop/hbase/io/BatchOperation.java +++ b/src/java/org/apache/hadoop/hbase/io/BatchOperation.java @@ -27,24 +27,40 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; /** - * batch update operation + * Batch update operations such as put, delete, and deleteAll. */ public class BatchOperation implements Writable { - /** put operation */ - public static final int PUT_OP = 1; - - /** delete operation */ - public static final int DELETE_OP = 2; - - private int op; + /** + * Operation types. + * @see org.apache.hadoop.io.SequenceFile.Writer + */ + public static enum Operation {PUT, DELETE} + + private Operation op; private Text column; private byte[] value; /** default constructor used by Writable */ public BatchOperation() { - this.op = 0; - this.column = new Text(); - this.value = null; + this(new Text()); + } + /** + * Creates a DELETE operation + * + * @param column column name + */ + public BatchOperation(final Text column) { + this(Operation.DELETE, column, null); + } + + /** + * Creates a PUT operation + * + * @param column column name + * @param value column value + */ + public BatchOperation(final Text column, final byte [] value) { + this(Operation.PUT, column, value); } /** @@ -53,22 +69,12 @@ public class BatchOperation implements Writable { * @param column column name * @param value column value */ - public BatchOperation(Text column, byte[] value) { - this.op = PUT_OP; + public BatchOperation(final Operation operation, final Text column, + final byte[] value) { + this.op = operation; this.column = column; this.value = value; } - - /** - * Creates a delete operation - * - * @param column name of column to delete - */ - public BatchOperation(Text column) { - this.op = DELETE_OP; - this.column = column; - this.value = null; - } /** * @return the column @@ -80,8 +86,8 @@ public class BatchOperation implements Writable { /** * @return the operation */ - public int getOp() { - return op; + public Operation getOp() { + return this.op; } /** @@ -99,9 +105,10 @@ public class BatchOperation implements Writable { * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { - op = in.readInt(); + int ordinal = in.readInt(); + this.op = Operation.values()[ordinal]; column.readFields(in); - if(op == PUT_OP) { + if (this.op == Operation.PUT) { value = new byte[in.readInt()]; in.readFully(value); } @@ -111,11 +118,11 @@ public class BatchOperation implements Writable { * {@inheritDoc} */ public void write(DataOutput out) throws IOException { - out.writeInt(op); + out.writeInt(this.op.ordinal()); column.write(out); - if(op == PUT_OP) { + if (this.op == Operation.PUT) { out.writeInt(value.length); out.write(value); } } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java index cdb994f242d..006db9a8103 100644 --- a/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java +++ b/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java @@ -61,7 +61,7 @@ public class BatchUpdate implements Writable, Iterable { */ public BatchUpdate(long lockid) { this.row = new Text(); - this.lockid = Long.valueOf(Math.abs(lockid)); + this.lockid = Math.abs(lockid); this.operations = new ArrayList(); } @@ -97,27 +97,28 @@ public class BatchUpdate implements Writable, Iterable { /** * Change a value for the specified column * - * @param lockid - lock id returned from startUpdate - * @param column - column whose value is being set - * @param val - new value for column + * @param lid lock id returned from startUpdate + * @param column column whose value is being set + * @param val new value for column */ - public synchronized void put(final long lockid, final Text column, + public synchronized void put(final long lid, final Text column, final byte val[]) { - if(this.lockid != lockid) { - throw new IllegalArgumentException("invalid lockid " + lockid); + if(this.lockid != lid) { + throw new IllegalArgumentException("invalid lockid " + lid); } operations.add(new BatchOperation(column, val)); } /** * Delete the value for a column - * - * @param lockid - lock id returned from startUpdate - * @param column - name of column whose value is to be deleted + * Deletes the cell whose row/column/commit-timestamp match those of the + * delete. + * @param lid lock id returned from startUpdate + * @param column name of column whose value is to be deleted */ - public synchronized void delete(final long lockid, final Text column) { - if(this.lockid != lockid) { - throw new IllegalArgumentException("invalid lockid " + lockid); + public synchronized void delete(final long lid, final Text column) { + if(this.lockid != lid) { + throw new IllegalArgumentException("invalid lockid " + lid); } operations.add(new BatchOperation(column)); } diff --git a/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java b/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java index 3f90032df53..51492ab8419 100644 --- a/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java @@ -153,6 +153,9 @@ public class ImmutableBytesWritable implements WritableComparable { /** {@inheritDoc} */ @Override public boolean equals(Object right_obj) { + if (right_obj instanceof byte []) { + return compareTo((byte [])right_obj) == 0; + } if (right_obj instanceof ImmutableBytesWritable) { return compareTo(right_obj) == 0; } diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 8e8f1d43007..0a5208473f3 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -26,6 +26,7 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType; import org.apache.hadoop.io.Text; /** @@ -41,6 +42,7 @@ public abstract class HBaseTestCase extends TestCase { protected static final char LAST_CHAR = 'z'; protected static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}; + protected static final int MAXVERSIONS = 3; static { StaticTestEnvironment.initialize(); @@ -100,10 +102,18 @@ public abstract class HBaseTestCase extends TestCase { } protected HTableDescriptor createTableDescriptor(final String name) { + return createTableDescriptor(name, MAXVERSIONS); + } + + protected HTableDescriptor createTableDescriptor(final String name, + final int versions) { HTableDescriptor htd = new HTableDescriptor(name); - htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1)); - htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2)); - htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3)); + htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions, + CompressionType.NONE, false, Integer.MAX_VALUE, null)); + htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions, + CompressionType.NONE, false, Integer.MAX_VALUE, null)); + htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions, + CompressionType.NONE, false, Integer.MAX_VALUE, null)); return htd; } @@ -123,18 +133,18 @@ public abstract class HBaseTestCase extends TestCase { if (startKeyBytes == null || startKeyBytes.length == 0) { startKeyBytes = START_KEY_BYTES; } - addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1); + addContent(new HRegionIncommon(r), column, startKeyBytes, endKey, -1); } /** * Add content to region r on the passed column * column. * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param updater An instance of {@link Loader}. + * @param updater An instance of {@link Incommon}. * @param column * @throws IOException */ - protected static void addContent(final Loader updater, final String column) + protected static void addContent(final Incommon updater, final String column) throws IOException { addContent(updater, column, START_KEY_BYTES, null); } @@ -143,13 +153,13 @@ public abstract class HBaseTestCase extends TestCase { * Add content to region r on the passed column * column. * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param updater An instance of {@link Loader}. + * @param updater An instance of {@link Incommon}. * @param column * @param startKeyBytes Where to start the rows inserted * @param endKey Where to stop inserting rows. * @throws IOException */ - protected static void addContent(final Loader updater, final String column, + protected static void addContent(final Incommon updater, final String column, final byte [] startKeyBytes, final Text endKey) throws IOException { addContent(updater, column, startKeyBytes, endKey, -1); @@ -159,14 +169,14 @@ public abstract class HBaseTestCase extends TestCase { * Add content to region r on the passed column * column. * Adds data of the from 'aaa', 'aab', etc where key and value are the same. - * @param updater An instance of {@link Loader}. + * @param updater An instance of {@link Incommon}. * @param column * @param startKeyBytes Where to start the rows inserted * @param endKey Where to stop inserting rows. * @param ts Timestamp to write the content with. * @throws IOException */ - protected static void addContent(final Loader updater, final String column, + protected static void addContent(final Incommon updater, final String column, final byte [] startKeyBytes, final Text endKey, final long ts) throws IOException { // Add rows of three characters. The first character starts with the @@ -207,23 +217,42 @@ public abstract class HBaseTestCase extends TestCase { } /** - * Interface used by the addContent methods so either a HTable or a HRegion - * can be passed to the methods. + * Implementors can flushcache. */ - public static interface Loader { - public long startBatchUpdate(final Text row) throws IOException; - public void put(long lockid, Text column, byte val[]) throws IOException; - public void commit(long lockid) throws IOException; - public void commit(long lockid, long ts) throws IOException; - public void abort(long lockid) throws IOException; + public static interface FlushCache { + public void flushcache() throws IOException; } /** - * A class that makes a {@link Loader} out of a {@link HRegion} + * Interface used by tests so can do common operations against an HTable + * or an HRegion. + * + * TOOD: Come up w/ a better name for this interface. */ - public static class HRegionLoader implements Loader { + public static interface Incommon { + public byte [] get(Text row, Text column) throws IOException; + public byte [][] get(Text row, Text column, int versions) + throws IOException; + public byte [][] get(Text row, Text column, long ts, int versions) + throws IOException; + public long startBatchUpdate(final Text row) throws IOException; + public void put(long lockid, Text column, byte val[]) throws IOException; + public void delete(long lockid, Text column) throws IOException; + public void deleteAll(Text row, Text column, long ts) throws IOException; + public void commit(long lockid) throws IOException; + public void commit(long lockid, long ts) throws IOException; + public void abort(long lockid) throws IOException; + public HScannerInterface getScanner(Text [] columns, Text firstRow, + long ts) + throws IOException; + } + + /** + * A class that makes a {@link Incommon} out of a {@link HRegion} + */ + public static class HRegionIncommon implements Incommon { final HRegion region; - public HRegionLoader(final HRegion HRegion) { + public HRegionIncommon(final HRegion HRegion) { super(); this.region = HRegion; } @@ -231,7 +260,7 @@ public abstract class HBaseTestCase extends TestCase { this.region.abort(lockid); } public void commit(long lockid) throws IOException { - this.region.commit(lockid, System.currentTimeMillis()); + this.region.commit(lockid); } public void commit(long lockid, final long ts) throws IOException { this.region.commit(lockid, ts); @@ -239,17 +268,38 @@ public abstract class HBaseTestCase extends TestCase { public void put(long lockid, Text column, byte[] val) throws IOException { this.region.put(lockid, column, val); } + public void delete(long lockid, Text column) throws IOException { + this.region.delete(lockid, column); + } + public void deleteAll(Text row, Text column, long ts) throws IOException { + this.region.deleteAll(row, column, ts); + } public long startBatchUpdate(Text row) throws IOException { return this.region.startUpdate(row); } + public HScannerInterface getScanner(Text [] columns, Text firstRow, + long ts) + throws IOException { + return this.region.getScanner(columns, firstRow, ts, null); + } + public byte[] get(Text row, Text column) throws IOException { + return this.region.get(row, column); + } + public byte[][] get(Text row, Text column, int versions) throws IOException { + return this.region.get(row, column, versions); + } + public byte[][] get(Text row, Text column, long ts, int versions) + throws IOException { + return this.region.get(row, column, ts, versions); + } } /** - * A class that makes a {@link Loader} out of a {@link HTable} + * A class that makes a {@link Incommon} out of a {@link HTable} */ - public static class HTableLoader implements Loader { + public static class HTableIncommon implements Incommon { final HTable table; - public HTableLoader(final HTable table) { + public HTableIncommon(final HTable table) { super(); this.table = table; } @@ -265,8 +315,30 @@ public abstract class HBaseTestCase extends TestCase { public void put(long lockid, Text column, byte[] val) throws IOException { this.table.put(lockid, column, val); } + public void delete(long lockid, Text column) throws IOException { + this.table.delete(lockid, column); + } + public void deleteAll(Text row, Text column, long ts) throws IOException { + this.table.deleteAll(row, column, ts); + } public long startBatchUpdate(Text row) { return this.table.startUpdate(row); } + public HScannerInterface getScanner(Text [] columns, Text firstRow, + long ts) + throws IOException { + return this.table.obtainScanner(columns, firstRow, ts, null); + } + public byte[] get(Text row, Text column) throws IOException { + return this.table.get(row, column); + } + public byte[][] get(Text row, Text column, int versions) + throws IOException { + return this.table.get(row, column, versions); + } + public byte[][] get(Text row, Text column, long ts, int versions) + throws IOException { + return this.table.get(row, column, ts, versions); + } } } \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index 6ad4158b23f..23b5d4967c7 100644 --- a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -417,4 +417,15 @@ public class MiniHBaseCluster implements HConstants { } f.delete(); } + + /** + * Call flushCache on all regions on all participating regionservers. + * @throws IOException + */ + void flushcache() throws IOException { + HRegionServer s = this.regionThreads.get(0).getRegionServer(); + for(HRegion r: s.onlineRegions.values() ) { + r.flushcache(false); + } + } } \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java index df385ca3a29..7fc0326cfa1 100644 --- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -54,7 +54,7 @@ public class MultiRegionTable extends HBaseTestCase { HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); int count = count(meta, HConstants.COLUMN_FAMILY_STR); HTable t = new HTable(conf, new Text(tableName)); - addContent(new HTableLoader(t), columnName); + addContent(new HTableIncommon(t), columnName); // All is running in the one JVM so I should be able to get the single // region instance and bring on a split. diff --git a/src/test/org/apache/hadoop/hbase/TestCompaction.java b/src/test/org/apache/hadoop/hbase/TestCompaction.java index 9cf4a8a0e08..e5c51095be3 100644 --- a/src/test/org/apache/hadoop/hbase/TestCompaction.java +++ b/src/test/org/apache/hadoop/hbase/TestCompaction.java @@ -23,71 +23,133 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.Text; /** * Test compactions */ public class TestCompaction extends HBaseTestCase { static final Log LOG = LogFactory.getLog(TestCompaction.class.getName()); - + private HLog hlog = null; + private HRegion r = null; + private static final String COLUMN_FAMILY = COLFAMILY_NAME1; + private static final Text STARTROW = new Text(START_KEY_BYTES); + private static final Text COLUMN_FAMILY_TEXT = new Text(COLUMN_FAMILY); + private static final Text COLUMN_FAMILY_TEXT_MINUS_COLON = + new Text(COLUMN_FAMILY.substring(0, COLUMN_FAMILY.length() - 1)); + private static final int COMPACTION_THRESHOLD = MAXVERSIONS; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.hlog = new HLog(this.localFs, this.testDir, this.conf); + HTableDescriptor htd = createTableDescriptor(getName()); + HRegionInfo hri = new HRegionInfo(1, htd, null, null); + this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + } + + @Override + public void tearDown() throws Exception { + this.r.close(); + this.hlog.closeAndDelete(); + super.tearDown(); + } + /** * Run compaction and flushing memcache + * Assert deletes get cleaned up. * @throws Exception */ public void testCompaction() throws Exception { - HLog hlog = new HLog(this.localFs, this.testDir, this.conf); - HTableDescriptor htd = createTableDescriptor(getName()); - HRegionInfo hri = new HRegionInfo(1, htd, null, null); - final HRegion r = - new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); - try { + createStoreFile(r); + assertFalse(r.needsCompaction()); + for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); - assertFalse(r.needsCompaction()); - int compactionThreshold = - this.conf.getInt("hbase.hstore.compactionThreshold", 3); - for (int i = 0; i < compactionThreshold; i++) { - createStoreFile(r); + } + assertTrue(r.needsCompaction()); + // Add more content. Now there are about 5 versions of each column. + // Default is that there only 3 (MAXVERSIONS) versions allowed per column. + // Assert > 3 and then after compaction, assert that only 3 versions + // available. + addContent(new HRegionIncommon(r), COLUMN_FAMILY); + byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); + // Assert that I can get > 5 versions (Should be at least 5 in there). + assertTrue(bytes.length >= 5); + // Try to run compaction concurrent with a thread flush just to see that + // we can. + final HRegion region = this.r; + Thread t1 = new Thread() { + @Override + public void run() { + try { + region.flushcache(false); + } catch (IOException e) { + e.printStackTrace(); + } } - assertTrue(r.needsCompaction()); - // Try to run compaction concurrent with a thread flush. - addContent(new HRegionLoader(r), COLFAMILY_NAME1); - Thread t1 = new Thread() { - @Override - public void run() { - try { - r.flushcache(false); - } catch (IOException e) { - e.printStackTrace(); - } + }; + Thread t2 = new Thread() { + @Override + public void run() { + try { + assertTrue(region.compactStores()); + } catch (IOException e) { + e.printStackTrace(); } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - assertTrue(r.compactStores()); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - t1.setDaemon(true); - t1.start(); - t2.setDaemon(true); - t2.start(); - t1.join(); - t2.join(); - } finally { - r.close(); - hlog.closeAndDelete(); + } + }; + t1.setDaemon(true); + t1.start(); + t2.setDaemon(true); + t2.start(); + t1.join(); + t2.join(); + // Now assert that there are 4 versions of a record only: thats the + // 3 versions that should be in the compacted store and then the one more + // we added when we compacted. + byte [] secondRowBytes = new byte[START_KEY_BYTES.length]; + System.arraycopy(START_KEY_BYTES, 0, secondRowBytes, 0, + START_KEY_BYTES.length); + // Increment the least significant character so we get to next row. + secondRowBytes[START_KEY_BYTES.length - 1]++; + Text secondRow = new Text(secondRowBytes); + bytes = this.r.get(secondRow, COLUMN_FAMILY_TEXT, 100/*Too many*/); + assertTrue(bytes.length == 4); + // Now add deletes to memcache and then flush it. That will put us over + // the compaction threshold of 3 store files. Compacting these store files + // should result in a compacted store file that has no references to the + // deleted row. + this.r.deleteAll(STARTROW, COLUMN_FAMILY_TEXT, System.currentTimeMillis()); + // Now, before compacting, remove all instances of the first row so can + // verify that it is removed as we compact. + // Assert all delted. + assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); + this.r.flushcache(false); + assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); + assertTrue(this.r.needsCompaction()); + this.r.compactStores(); + // Assert that the first row is still deleted. + bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); + assertNull(bytes); + // Assert the store files do not have the first record 'aaa' keys in them. + for (MapFile.Reader reader: + this.r.stores.get(COLUMN_FAMILY_TEXT_MINUS_COLON).readers.values()) { + reader.reset(); + HStoreKey key = new HStoreKey(); + ImmutableBytesWritable val = new ImmutableBytesWritable(); + while(reader.next(key, val)) { + assertFalse(key.getRow().equals(STARTROW)); + } } } - - private void createStoreFile(final HRegion r) throws IOException { - HRegionLoader loader = new HRegionLoader(r); - for (int i = 0; i < 3; i++) { - addContent(loader, COLFAMILY_NAME1); + + private void createStoreFile(final HRegion region) throws IOException { + HRegionIncommon loader = new HRegionIncommon(region); + for (int i = 0; i < 1; i++) { + addContent(loader, COLUMN_FAMILY); } - r.flushcache(false); + region.flushcache(false); } -} \ No newline at end of file +} diff --git a/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java b/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java index 27d25377f69..fc4331b2172 100644 --- a/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java +++ b/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java @@ -45,32 +45,27 @@ public class TestMasterAdmin extends HBaseClusterTestCase { admin.disableTable(testDesc.getName()); try { - try { - @SuppressWarnings("unused") - HTable table = new HTable(conf, testDesc.getName()); + @SuppressWarnings("unused") + HTable table = new HTable(conf, testDesc.getName()); - } catch(IllegalStateException e) { - // Expected - } - - admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:")); - admin.enableTable(testDesc.getName()); - try { - admin.deleteColumn(testDesc.getName(), new Text("col2:")); - - } catch(TableNotDisabledException e) { - // Expected - } - - admin.disableTable(testDesc.getName()); - admin.deleteColumn(testDesc.getName(), new Text("col2:")); + } catch(IllegalStateException e) { + // Expected - } catch(Exception e) { - e.printStackTrace(); - fail(); - - } finally { - admin.deleteTable(testDesc.getName()); + // This exception is not actually thrown. It doesn't look like it should + // thrown since the connection manager is already filled w/ data + // -- noticed by St.Ack 09/09/2007 } + + admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:")); + admin.enableTable(testDesc.getName()); + try { + admin.deleteColumn(testDesc.getName(), new Text("col2:")); + } catch(TableNotDisabledException e) { + // Expected + } + + admin.disableTable(testDesc.getName()); + admin.deleteColumn(testDesc.getName(), new Text("col2:")); + admin.deleteTable(testDesc.getName()); } } diff --git a/src/test/org/apache/hadoop/hbase/TestScanner2.java b/src/test/org/apache/hadoop/hbase/TestScanner2.java index 4ac1b8f8c95..3eed6cb2ad2 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner2.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner2.java @@ -172,11 +172,12 @@ public class TestScanner2 extends HBaseClusterTestCase { * @throws IOException */ public void testSplitDeleteOneAddTwoRegions() throws IOException { + HTable metaTable = new HTable(conf, HConstants.META_TABLE_NAME); // First add a new table. Its intial region will be added to META region. HBaseAdmin admin = new HBaseAdmin(conf); Text tableName = new Text(getName()); admin.createTable(new HTableDescriptor(tableName.toString())); - List regions = scan(conf, HConstants.META_TABLE_NAME); + List regions = scan(conf, metaTable); assertEquals("Expected one region", regions.size(), 1); HRegionInfo region = regions.get(0); assertTrue("Expected region named for test", @@ -196,10 +197,10 @@ public class TestScanner2 extends HBaseClusterTestCase { homedir, this.conf, null)); try { for (HRegion r : newRegions) { - addRegionToMETA(conf, HConstants.META_TABLE_NAME, r, - this.cluster.getHMasterAddress(), -1L); + addRegionToMETA(conf, metaTable, r, this.cluster.getHMasterAddress(), + -1L); } - regions = scan(conf, HConstants.META_TABLE_NAME); + regions = scan(conf, metaTable); assertEquals("Should be two regions only", 2, regions.size()); } finally { for (HRegion r : newRegions) { @@ -209,14 +210,13 @@ public class TestScanner2 extends HBaseClusterTestCase { } } - private List scan(final Configuration conf, final Text table) + private List scan(final Configuration conf, final HTable t) throws IOException { List regions = new ArrayList(); HRegionInterface regionServer = null; long scannerId = -1L; try { - HTable t = new HTable(conf, table); - HRegionLocation rl = t.getRegionLocation(table); + HRegionLocation rl = t.getRegionLocation(t.getTableName()); regionServer = t.getConnection().getHRegionConnection(rl.getServerAddress()); scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(), HConstants.COLUMN_FAMILY_ARRAY, new Text(), @@ -263,25 +263,24 @@ public class TestScanner2 extends HBaseClusterTestCase { } private void addRegionToMETA(final Configuration conf, - final Text table, final HRegion region, + final HTable t, final HRegion region, final HServerAddress serverAddress, final long startCode) throws IOException { - HTable t = new HTable(conf, table); - try { - long lockid = t.startUpdate(region.getRegionName()); - t.put(lockid, HConstants.COL_REGIONINFO, - Writables.getBytes(region.getRegionInfo())); - t.put(lockid, HConstants.COL_SERVER, - Writables.stringToBytes(serverAddress.toString())); - t.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(startCode)); - t.commit(lockid); - if (LOG.isDebugEnabled()) { - LOG.info("Added region " + region.getRegionName() + " to table " + - table); - } - } finally { - t.close(); + long lockid = t.startUpdate(region.getRegionName()); + t.put(lockid, HConstants.COL_REGIONINFO, + Writables.getBytes(region.getRegionInfo())); + t.put(lockid, HConstants.COL_SERVER, + Writables.stringToBytes(serverAddress.toString())); + t.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(startCode)); + t.commit(lockid); + // Assert added. + byte [] bytes = t.get(region.getRegionName(), HConstants.COL_REGIONINFO); + HRegionInfo hri = Writables.getHRegionInfo(bytes); + assertEquals(hri.getRegionId(), region.getRegionId()); + if (LOG.isDebugEnabled()) { + LOG.info("Added region " + region.getRegionName() + " to table " + + t.getTableName()); } } diff --git a/src/test/org/apache/hadoop/hbase/TestTimestamp.java b/src/test/org/apache/hadoop/hbase/TestTimestamp.java index 44e63fbff40..0f224771359 100644 --- a/src/test/org/apache/hadoop/hbase/TestTimestamp.java +++ b/src/test/org/apache/hadoop/hbase/TestTimestamp.java @@ -21,10 +21,14 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.TreeMap; +import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; /** - * Tests user specifiable time stamps + * Tests user specifiable time stamps putting, getting and scanning. Also + * tests same in presence of deletes. Test cores are written so can be + * run against an HRegion and against an HTable: i.e. both local and remote. */ public class TestTimestamp extends HBaseTestCase { private static final long T0 = 10L; @@ -32,74 +36,196 @@ public class TestTimestamp extends HBaseTestCase { private static final long T2 = 200L; private static final String COLUMN_NAME = "contents:"; - private static final String TABLE_NAME = "test"; - private static final String VERSION1 = "version1"; - private static final String LATEST = "latest"; private static final Text COLUMN = new Text(COLUMN_NAME); - private static final Text[] COLUMNS = { - COLUMN - }; - private static final Text TABLE = new Text(TABLE_NAME); + private static final Text[] COLUMNS = {COLUMN}; private static final Text ROW = new Text("row"); + + // When creating column descriptor, how many versions of a cell to allow. + private static final int VERSIONS = 3; /** * Test that delete works according to description in hadoop-1784 - * when it comes to timestamps. + * href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784. * @throws IOException */ public void testDelete() throws IOException { - HRegion r = createRegion(); + final HRegion r = createRegion(); try { - HRegionLoader loader = new HRegionLoader(r); - // Add a couple of values for three different timestamps. - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0); - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1); - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T2); - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad")); - // If I delete w/o specifying a timestamp, this means I'm deleting the - // latest. - delete(r, System.currentTimeMillis()); - // Verify that I get back T2 through T0. + doTestDelete(new HRegionIncommon(r), new FlushCache() { + public void flushcache() throws IOException { + r.flushcache(false); + } + }); } finally { r.close(); r.getLog().closeAndDelete(); } } - - private void delete(final HRegion r, final long ts) throws IOException { - long lockid = r.startUpdate(ROW); - r.delete(lockid, COLUMN); - r.commit(lockid, ts == -1? System.currentTimeMillis(): ts); - } - + /** * Test scanning against different timestamps. * @throws IOException */ public void testTimestampScanning() throws IOException { - HRegion r = createRegion(); + final HRegion r = createRegion(); try { - HRegionLoader loader = new HRegionLoader(r); - // Add a couple of values for three different timestamps. - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0); - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1); - addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad")); - // Get count of latest items. - int count = assertScanContentTimestamp(r, System.currentTimeMillis()); - // Assert I get same count when I scan at each timestamp. - assertEquals(count, assertScanContentTimestamp(r, T0)); - assertEquals(count, assertScanContentTimestamp(r, T1)); - // Flush everything out to disk and then retry - r.flushcache(false); - assertEquals(count, assertScanContentTimestamp(r, T0)); - assertEquals(count, assertScanContentTimestamp(r, T1)); + doTestTimestampScanning(new HRegionIncommon(r), new FlushCache() { + public void flushcache() throws IOException { + r.flushcache(false); + } + }); } finally { r.close(); r.getLog().closeAndDelete(); } } + + /** + * Basic test of timestamps. + * Do the above tests from client side. + * @throws IOException + */ + public void testTimestamps() throws IOException { + final MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1); + try { + HTable t = createTable(); + Incommon incommon = new HTableIncommon(t); + doTestDelete(incommon, new FlushCache() { + public void flushcache() throws IOException { + cluster.flushcache(); + } + }); + + // Perhaps drop and readd the table between tests so the former does + // not pollute this latter? Or put into separate tests. + doTestTimestampScanning(incommon, new FlushCache() { + public void flushcache() throws IOException { + cluster.flushcache(); + } + }); + } catch (Exception e) { + cluster.shutdown(); + } + } + + /* + * Run test that delete works according to description in hadoop-1784. + * @param incommon + * @param flusher + * @throws IOException + */ + private void doTestDelete(final Incommon incommon, FlushCache flusher) + throws IOException { + // Add values at various timestamps (Values are timestampes as bytes). + put(incommon, T0); + put(incommon, T1); + put(incommon, T2); + put(incommon); + // Verify that returned versions match passed timestamps. + assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T2, T1}); + // If I delete w/o specifying a timestamp, this means I'm deleting the + // latest. + delete(incommon); + // Verify that I get back T2 through T1 -- that the latest version has + // been deleted. + assertVersions(incommon, new long [] {T2, T1, T0}); + + // Flush everything out to disk and then retry + flusher.flushcache(); + assertVersions(incommon, new long [] {T2, T1, T0}); + + // Now add, back a latest so I can test remove other than the latest. + put(incommon); + assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T2, T1}); + delete(incommon, T2); + assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T1, T0}); + // Flush everything out to disk and then retry + flusher.flushcache(); + assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T1, T0}); + + // Now try deleting all from T2 back inclusive (We first need to add T2 + // back into the mix and to make things a little interesting, delete and + // then readd T1. + put(incommon, T2); + delete(incommon, T1); + put(incommon, T1); + incommon.deleteAll(ROW, COLUMN, T2); + // Should only be current value in set. Assert this is so + assertOnlyLatest(incommon, HConstants.LATEST_TIMESTAMP); + + // Flush everything out to disk and then redo above tests + flusher.flushcache(); + assertOnlyLatest(incommon, HConstants.LATEST_TIMESTAMP); + } + + private void assertOnlyLatest(final Incommon incommon, + final long currentTime) + throws IOException { + byte [][] bytesBytes = incommon.get(ROW, COLUMN, 3/*Ask for too much*/); + assertEquals(1, bytesBytes.length); + long time = Writables.bytesToLong(bytesBytes[0]); + assertEquals(time, currentTime); + assertNull(incommon.get(ROW, COLUMN, T1, 3 /*Too many*/)); + assertTrue(assertScanContentTimestamp(incommon, T1) == 0); + } + + /* + * Assert that returned versions match passed in timestamps and that results + * are returned in the right order. Assert that values when converted to + * longs match the corresponding passed timestamp. + * @param r + * @param tss + * @throws IOException + */ + private void assertVersions(final Incommon incommon, final long [] tss) + throws IOException { + // Assert that 'latest' is what we expect. + byte [] bytes = incommon.get(ROW, COLUMN); + assertEquals(Writables.bytesToLong(bytes), tss[0]); + // Now assert that if we ask for multiple versions, that they come out in + // order. + byte [][] bytesBytes = incommon.get(ROW, COLUMN, tss.length); + assertEquals(bytesBytes.length, tss.length); + for (int i = 0; i < bytesBytes.length; i++) { + long ts = Writables.bytesToLong(bytesBytes[i]); + assertEquals(ts, tss[i]); + } + // Specify a timestamp get multiple versions. + bytesBytes = incommon.get(ROW, COLUMN, tss[0], bytesBytes.length - 1); + for (int i = 1; i < bytesBytes.length; i++) { + long ts = Writables.bytesToLong(bytesBytes[i]); + assertEquals(ts, tss[i]); + } + // Test scanner returns expected version + assertScanContentTimestamp(incommon, tss[0]); + } + + /* + * Run test scanning different timestamps. + * @param incommon + * @param flusher + * @throws IOException + */ + private void doTestTimestampScanning(final Incommon incommon, + final FlushCache flusher) + throws IOException { + // Add a couple of values for three different timestamps. + put(incommon, T0); + put(incommon, T1); + put(incommon, HConstants.LATEST_TIMESTAMP); + // Get count of latest items. + int count = assertScanContentTimestamp(incommon, + HConstants.LATEST_TIMESTAMP); + // Assert I get same count when I scan at each timestamp. + assertEquals(count, assertScanContentTimestamp(incommon, T0)); + assertEquals(count, assertScanContentTimestamp(incommon, T1)); + // Flush everything out to disk and then retry + flusher.flushcache(); + assertEquals(count, assertScanContentTimestamp(incommon, T0)); + assertEquals(count, assertScanContentTimestamp(incommon, T1)); + } /* * Assert that the scan returns only values < timestamp. @@ -108,19 +234,21 @@ public class TestTimestamp extends HBaseTestCase { * @return Count of items scanned. * @throws IOException */ - private int assertScanContentTimestamp(final HRegion r, final long ts) + private int assertScanContentTimestamp(final Incommon in, final long ts) throws IOException { + HScannerInterface scanner = + in.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts); int count = 0; - HInternalScannerInterface scanner = - r.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts, null); try { HStoreKey key = new HStoreKey(); TreeMapvalue = new TreeMap(); while (scanner.next(key, value)) { assertTrue(key.getTimestamp() <= ts); - Text row = key.getRow(); - assertEquals(row.toString(), - new String(value.get(COLUMN), HConstants.UTF8_ENCODING)); + // Content matches the key or HConstants.LATEST_TIMESTAMP. + // (Key does not match content if we 'put' with LATEST_TIMESTAMP). + long l = Writables.bytesToLong(value.get(COLUMN)); + assertTrue(key.getTimestamp() == l || + HConstants.LATEST_TIMESTAMP == l); count++; value.clear(); } @@ -129,118 +257,48 @@ public class TestTimestamp extends HBaseTestCase { } return count; } - - /** - * Basic test of timestamps. - * TODO: Needs rewrite after hadoop-1784 gets fixed. - * @throws IOException - */ - public void testTimestamps() throws IOException { - MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1); - try { - HTable table = createTable(); - - // store a value specifying an update time - put(table, VERSION1.getBytes(HConstants.UTF8_ENCODING), T0); - - // store a value specifying 'now' as the update time - put(table, LATEST.getBytes(HConstants.UTF8_ENCODING), -1); - - // delete values older than T1 - long lockid = table.startUpdate(ROW); - table.delete(lockid, COLUMN); - table.commit(lockid, T1); - - // now retrieve... - assertGets(table); - - // flush everything out to disk - HRegionServer s = cluster.regionThreads.get(0).getRegionServer(); - for(HRegion r: s.onlineRegions.values() ) { - r.flushcache(false); - } - - // now retrieve... - assertGets(table); - - // Test scanners - assertScanCount(table, -1, 1); - assertScanCount(table, T1, 0); - } catch (Exception e) { - cluster.shutdown(); - } - } - /* - * Test count of results scanning. - * @param table - * @param ts - * @param expectedCount - * @throws IOException - */ - private void assertScanCount(final HTable table, final long ts, - final int expectedCount) + private void put(final Incommon loader, final long ts) throws IOException { - HScannerInterface scanner = (ts == -1)? - table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW): - table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts); - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - int count = 0; - while(scanner.next(key, results)) { - count++; - } - assertEquals(count, expectedCount); - assertEquals(results.size(), expectedCount); - - } finally { - scanner.close(); - } + put(loader, Writables.longToBytes(ts), ts); + } + + private void put(final Incommon loader) + throws IOException { + long ts = HConstants.LATEST_TIMESTAMP; + put(loader, Writables.longToBytes(ts), ts); } /* - * Test can do basic gets. - * Used by testTimestamp above. - * @param table - * @throws IOException - */ - private void assertGets(final HTable table) throws IOException { - // the most recent version: - byte[] bytes = table.get(ROW, COLUMN); - assertTrue(bytes != null && bytes.length != 0); - assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING))); - - // any version <= time T1 - byte[][] values = table.get(ROW, COLUMN, T1, 3); - assertNull(values); - - // the version from T0 - values = table.get(ROW, COLUMN, T0, 3); - assertTrue(values.length == 1 - && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING))); - - // three versions older than now - values = table.get(ROW, COLUMN, 3); - assertTrue(values.length == 1 - && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING))); - } - - /* - * Put values. - * @param table + * Put values. + * @param loader * @param bytes * @param ts * @throws IOException */ - private void put(final HTable table, final byte [] bytes, final long ts) + private void put(final Incommon loader, final byte [] bytes, + final long ts) throws IOException { - long lockid = table.startUpdate(ROW); - table.put(lockid, COLUMN, bytes); - if (ts == -1) { - table.commit(lockid); + long lockid = loader.startBatchUpdate(ROW); + loader.put(lockid, COLUMN, bytes); + if (ts == HConstants.LATEST_TIMESTAMP) { + loader.commit(lockid); } else { - table.commit(lockid, ts); + loader.commit(lockid, ts); + } + } + + private void delete(final Incommon loader) throws IOException { + delete(loader, HConstants.LATEST_TIMESTAMP); + } + + private void delete(final Incommon loader, final long ts) throws IOException { + long lockid = loader.startBatchUpdate(ROW); + loader.delete(lockid, COLUMN); + if (ts == HConstants.LATEST_TIMESTAMP) { + loader.commit(lockid); + } else { + loader.commit(lockid, ts); } } @@ -250,17 +308,18 @@ public class TestTimestamp extends HBaseTestCase { * @throws IOException */ private HTable createTable() throws IOException { - HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + HTableDescriptor desc = new HTableDescriptor(getName()); desc.addFamily(new HColumnDescriptor(COLUMN_NAME)); HBaseAdmin admin = new HBaseAdmin(conf); admin.createTable(desc); - return new HTable(conf, TABLE); + return new HTable(conf, new Text(getName())); } private HRegion createRegion() throws IOException { HLog hlog = new HLog(this.localFs, this.testDir, this.conf); HTableDescriptor htd = createTableDescriptor(getName()); - htd.addFamily(new HColumnDescriptor(COLUMN_NAME)); + htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS, + CompressionType.NONE, false, Integer.MAX_VALUE, null)); HRegionInfo hri = new HRegionInfo(1, htd, null, null); return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); }