HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence

of deletes. Added a deleteAll to remove all cells equal to or older than
passed timestamp.  Fixed compaction so deleted cells do not make it out
into compacted output.  Ensure also that versions > column max are dropped
compacting.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@574287 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-09-10 15:56:16 +00:00
parent 361f74c0c2
commit 2cc1c9f65b
21 changed files with 1208 additions and 506 deletions

View File

@ -8,6 +8,11 @@ Trunk (unreleased changes)
NEW FEATURES
HADOOP-1768 FS command using Hadoop FsShell operations
(Edward Yoon via Stack)
HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence
of deletes. Added a deleteAll to remove all cells equal to or
older than passed timestamp. Fixed compaction so deleted cells
do not make it out into compacted output. Ensure also that
versions > column max are dropped compacting.
OPTIMIZATIONS

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import java.util.regex.Pattern;
@ -205,7 +206,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
*
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
*/
public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
throws IOException {
// Find the next row label (and timestamp)
Text chosenRow = null;
@ -218,7 +219,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
|| (keys[i].getRow().compareTo(chosenRow) < 0)
|| ((keys[i].getRow().compareTo(chosenRow) == 0)
&& (keys[i].getTimestamp() > chosenTimestamp)))) {
chosenRow = new Text(keys[i].getRow());
chosenTimestamp = keys[i].getTimestamp();
}

View File

@ -103,7 +103,7 @@ public interface HConstants {
// be the first to be reassigned if the server(s) they are being served by
// should go down.
/** The root table's name. */
/** The root table's name.*/
static final Text ROOT_TABLE_NAME = new Text("-ROOT-");
/** The META table's name. */
@ -139,10 +139,28 @@ public interface HConstants {
static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
// Other constants
/** used by scanners, etc when they want to start at the beginning of a region */
static final Text EMPTY_START_ROW = new Text();
/**
* An empty instance of Text.
*/
static final Text EMPTY_TEXT = new Text();
/**
* Used by scanners, etc when they want to start at the beginning of a region
*/
static final Text EMPTY_START_ROW = EMPTY_TEXT;
/** When we encode strings, we always specify UTF8 encoding */
static final String UTF8_ENCODING = "UTF-8";
/**
* Timestamp to use when we want to refer to the latest cell.
* This is the timestamp sent by clients when no timestamp is specified on
* commit.
*/
static final long LATEST_TIMESTAMP = Long.MAX_VALUE;
/**
* Define for 'return-all-versions'.
*/
static final int ALL_VERSIONS = -1;
}

View File

@ -19,11 +19,6 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
/**
* Internally, we need to be able to determine if the scanner is doing wildcard
* column matches (when only a column family is specified or if a column regex
@ -31,29 +26,7 @@ import org.apache.hadoop.io.Text;
* specified. If so, we need to ignore the timestamp to ensure that we get all
* the family members, as they may have been last updated at different times.
*/
public interface HInternalScannerInterface {
/**
* Grab the next row's worth of values. The HScanner will return the most
* recent data value for each row that is not newer than the target time.
*
* If a dataFilter is defined, it will be used to skip rows that do not
* match its criteria. It may cause the scanner to stop prematurely if it
* knows that it will no longer accept the remaining results.
*
* @param key HStoreKey containing row and timestamp
* @param results Map of column/value pairs
* @return true if a value was found
* @throws IOException
*/
public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
throws IOException;
/**
* Close the scanner.
*/
public void close();
public interface HInternalScannerInterface extends HScannerInterface {
/** @return true if the scanner is matching a column family or regex */
public boolean isWildcardScanner();

View File

@ -175,18 +175,15 @@ public class HMemcache {
* @return An array of byte arrays ordered by timestamp.
*/
public byte [][] get(final HStoreKey key, final int numVersions) {
List<byte []> results = new ArrayList<byte[]>();
this.lock.obtainReadLock();
try {
ArrayList<byte []> result =
get(memcache, key, numVersions - results.size());
results.addAll(0, result);
ArrayList<byte []> results = get(memcache, key, numVersions);
for (int i = history.size() - 1; i >= 0; i--) {
if (numVersions > 0 && results.size() >= numVersions) {
break;
}
result = get(history.elementAt(i), key, numVersions - results.size());
results.addAll(results.size(), result);
results.addAll(results.size(),
get(history.elementAt(i), key, numVersions - results.size()));
}
return (results.size() == 0)?
null: ImmutableBytesWritable.toArray(results);
@ -194,7 +191,6 @@ public class HMemcache {
this.lock.releaseReadLock();
}
}
/**
* Return all the available columns for the given key. The key indicates a
@ -248,7 +244,8 @@ public class HMemcache {
* @param map
* @param key
* @param numVersions
* @return Ordered list of items found in passed <code>map</code>
* @return Ordered list of items found in passed <code>map</code>. If no
* matching values, returns an empty list (does not return null).
*/
ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
final HStoreKey key, final int numVersions) {
@ -261,15 +258,10 @@ public class HMemcache {
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
if(HGlobals.deleteBytes.compareTo(es.getValue()) == 0) {
// TODO: Shouldn't this be a continue rather than a break? Perhaps
// the intent is that this DELETE_BYTES is meant to suppress older
// info -- see 5.4 Compactions in BigTable -- but how does this jibe
// with being able to remove one version only?
break;
if (!isDeleted(es.getValue())) {
result.add(tailMap.get(itKey));
curKey.setVersion(itKey.getTimestamp() - 1);
}
result.add(tailMap.get(itKey));
curKey.setVersion(itKey.getTimestamp() - 1);
}
if (numVersions > 0 && result.size() >= numVersions) {
break;
@ -278,6 +270,77 @@ public class HMemcache {
return result;
}
/**
* Get <code>versions</code> keys matching the origin key's
* row/column/timestamp and those of an older vintage
* Default access so can be accessed out of {@link HRegionServer}.
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants.ALL_VERSIONS} to retrieve all.
* @return Ordered list of <code>versions</code> keys going from newest back.
* @throws IOException
*/
List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
this.lock.obtainReadLock();
try {
List<HStoreKey> results = getKeys(this.memcache, origin, versions);
for (int i = history.size() - 1; i >= 0; i--) {
results.addAll(results.size(), getKeys(history.elementAt(i), origin,
versions == HConstants.ALL_VERSIONS? versions:
(results != null? versions - results.size(): versions)));
}
return results;
} finally {
this.lock.releaseReadLock();
}
}
/*
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants.ALL_VERSIONS} to retrieve all.
* @return List of all keys that are of the same row and column and of
* equal or older timestamp. If no keys, returns an empty List. Does not
* return null.
*/
private List<HStoreKey> getKeys(final TreeMap<HStoreKey, byte []> map,
final HStoreKey origin, final int versions) {
List<HStoreKey> result = new ArrayList<HStoreKey>();
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey key = es.getKey();
if (!key.matchesRowCol(origin)) {
break;
}
if (!isDeleted(es.getValue())) {
result.add(key);
if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
// We have enough results. Return.
break;
}
}
}
return result;
}
/**
* @param key
* @return True if an entry and its content is {@link HGlobals.deleteBytes}.
* Use checking values in store. On occasion the memcache has the fact that
* the cell has been deleted.
*/
boolean isDeleted(final HStoreKey key) {
return isDeleted(this.memcache.get(key));
}
/**
* @param value
* @return True if an entry and its content is {@link HGlobals.deleteBytes}.
*/
boolean isDeleted(final byte [] value) {
return (value == null)? false: HGlobals.deleteBytes.compareTo(value) == 0;
}
/**
* Return a scanner over the keys in the HMemcache
*/

View File

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Vector;
@ -581,6 +582,9 @@ public class HRegion implements HConstants {
lock.obtainReadLock();
try {
HStore.HStoreSize biggest = largestHStore(midKey);
if (biggest == null) {
return false;
}
long triggerSize =
this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
boolean split = (biggest.getAggregate() >= triggerSize);
@ -911,26 +915,47 @@ public class HRegion implements HConstants {
}
}
/** Private implementation: get the value for the indicated HStoreKey */
private byte [][] get(HStoreKey key, int numVersions) throws IOException {
private byte [][] get(final HStoreKey key, final int numVersions)
throws IOException {
lock.obtainReadLock();
try {
// Check the memcache
byte [][] result = memcache.get(key, numVersions);
if(result != null) {
return result;
byte [][] memcacheResult = this.memcache.get(key, numVersions);
// If we got sufficient versions from memcache, return.
if (memcacheResult != null && memcacheResult.length == numVersions) {
return memcacheResult;
}
// If unavailable in memcache, check the appropriate HStore
// Check hstore for more versions.
Text colFamily = HStoreKey.extractFamily(key.getColumn());
HStore targetStore = stores.get(colFamily);
if(targetStore == null) {
return null;
// There are no stores. Return what we got from memcache.
return memcacheResult;
}
return targetStore.get(key, numVersions);
// Update the number of versions we need to fetch from the store.
int amendedNumVersions = numVersions;
if (memcacheResult != null) {
amendedNumVersions -= memcacheResult.length;
}
byte [][] result =
targetStore.get(key, amendedNumVersions, this.memcache);
if (result == null) {
result = memcacheResult;
} else if (memcacheResult != null) {
// We have results from both memcache and from stores. Put them
// together in an array in the proper order.
byte [][] storeResult = result;
result = new byte [memcacheResult.length + result.length][];
for (int i = 0; i < memcacheResult.length; i++) {
result[i] = memcacheResult[i];
}
for (int i = 0; i < storeResult.length; i++) {
result[i + memcacheResult.length] = storeResult[i];
}
}
return result;
} finally {
lock.releaseReadLock();
}
@ -962,6 +987,45 @@ public class HRegion implements HConstants {
}
}
/**
* Get all keys matching the origin key's row/column/timestamp and those
* of an older vintage
* Default access so can be accessed out of {@link HRegionServer}.
* @param origin Where to start searching.
* @return Ordered list of keys going from newest on back.
* @throws IOException
*/
List<HStoreKey> getKeys(final HStoreKey origin) throws IOException {
return getKeys(origin, ALL_VERSIONS);
}
/**
* Get <code>versions</code> keys matching the origin key's
* row/column/timestamp and those of an older vintage
* Default access so can be accessed out of {@link HRegionServer}.
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants.ALL_VERSIONS} to retrieve all.
* @return Ordered list of <code>versions</code> keys going from newest back.
* @throws IOException
*/
List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
throws IOException {
List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
if (versions != ALL_VERSIONS && keys.size() >= versions) {
return keys;
}
// Check hstore for more versions.
Text colFamily = HStoreKey.extractFamily(origin.getColumn());
HStore targetStore = stores.get(colFamily);
if (targetStore != null) {
// Pass versions without modification since in the store getKeys, it
// includes the size of the passed <code>keys</code> array when counting.
keys = targetStore.getKeys(origin, keys, versions);
}
return keys;
}
/**
* Return an iterator that scans over the HRegion, returning the indicated
* columns for only the rows that match the data filter. This Iterator must be closed by the caller.
@ -1110,8 +1174,8 @@ public class HRegion implements HConstants {
}
/**
* Delete a value or write a value. This is a just a convenience method for put().
*
* Delete a value or write a value.
* This is a just a convenience method for put().
* @param lockid lock id obtained from startUpdate
* @param targetCol name of column to be deleted
* @throws IOException
@ -1119,6 +1183,51 @@ public class HRegion implements HConstants {
public void delete(long lockid, Text targetCol) throws IOException {
localput(lockid, targetCol, HGlobals.deleteBytes.get());
}
/**
* Delete all cells of the same age as the passed timestamp or older.
* @param row
* @param column
* @param ts Delete all entries that have this timestamp or older
* @throws IOException
*/
public void deleteAll(final Text row, final Text column, final long ts)
throws IOException {
deleteMultiple(row, column, ts, ALL_VERSIONS);
}
/**
* Delete one or many cells.
* Used to support {@link #deleteAll(Text, Text, long)} and deletion of
* latest cell.
* @param row
* @param column
* @param ts Timestamp to start search on.
* @param versions How many versions to delete. Pass
* {@link HConstants.ALL_VERSIONS} to delete all.
* @throws IOException
*/
void deleteMultiple(final Text row, final Text column, final long ts,
final int versions)
throws IOException {
lock.obtainReadLock();
try {
checkColumn(column);
HStoreKey origin = new HStoreKey(row, column, ts);
synchronized(row) {
List<HStoreKey> keys = getKeys(origin, versions);
if (keys.size() > 0) {
TreeMap<Text, byte []> edits = new TreeMap<Text, byte []>();
edits.put(column, HGlobals.deleteBytes.get());
for (HStoreKey key: keys) {
update(row, key.getTimestamp(), edits);
}
}
}
} finally {
lock.releaseReadLock();
}
}
/**
* Private implementation.
@ -1202,10 +1311,11 @@ public class HRegion implements HConstants {
* Once updates hit the change log, they are safe. They will either be moved
* into an HStore in the future, or they will be recovered from the log.
* @param lockid Lock for row we're to commit.
* @param timestamp the time to associate with this change
* @param timestamp the time to associate with this change.
* @throws IOException
*/
public void commit(final long lockid, long timestamp) throws IOException {
public void commit(final long lockid, final long timestamp)
throws IOException {
// Remove the row from the pendingWrites list so
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
@ -1216,19 +1326,75 @@ public class HRegion implements HConstants {
// This check makes sure that another thread from the client
// hasn't aborted/committed the write-operation
synchronized(row) {
// Add updates to the log and add values to the memcache.
Long lid = Long.valueOf(lockid);
TreeMap<Text, byte []> columns = this.targetColumns.get(lid);
if (columns != null && columns.size() > 0) {
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
row, columns, timestamp);
memcache.add(row, columns, timestamp);
// OK, all done!
}
update(row, timestamp, this.targetColumns.get(lid));
targetColumns.remove(lid);
releaseRowLock(row);
}
}
/**
* This method for unit testing only.
* Does each operation individually so can do appropriate
* {@link HConstants#LATEST_TIMESTAMP} action. Tries to mimic how
* {@link HRegionServer#batchUpdate(Text, long, org.apache.hadoop.hbase.io.BatchUpdate)}
* works when passed a timestamp of LATEST_TIMESTAMP.
* @param lockid Lock for row we're to commit.
* @throws IOException
* @throws IOException
* @see {@link #commit(long, long)}
*/
void commit(final long lockid) throws IOException {
// Remove the row from the pendingWrites list so
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
if(row == null) {
throw new LockException("No write lock for lockid " + lockid);
}
// This check makes sure that another thread from the client
// hasn't aborted/committed the write-operation
synchronized(row) {
Long lid = Long.valueOf(lockid);
TreeMap<Text, byte []> updatesByColumn = this.targetColumns.get(lid);
// Run updates one at a time so we can supply appropriate timestamp
long now = System.currentTimeMillis();
for (Map.Entry<Text, byte []>e: updatesByColumn.entrySet()) {
if (HGlobals.deleteBytes.equals(e.getValue())) {
// Its a delete. Delete latest. deleteMultiple calls update for us.
// Actually regets the row lock but since we already have it, should
// be fine.
deleteMultiple(row, e.getKey(), LATEST_TIMESTAMP, 1);
continue;
}
// Must be a 'put'.
TreeMap<Text, byte []> putEdit = new TreeMap<Text, byte []>();
putEdit.put(e.getKey(), e.getValue());
update(row, now, putEdit);
}
this.targetColumns.remove(lid);
releaseRowLock(row);
}
}
/*
* Add updates to the log and add values to the memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param row Row to update.
* @param timestamp Timestamp to record the updates against
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
private void update(final Text row, final long timestamp,
final TreeMap<Text, byte []> updatesByColumn)
throws IOException {
if (updatesByColumn == null || updatesByColumn.size() <= 0) {
return;
}
this.log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
row, updatesByColumn, timestamp);
this.memcache.add(row, updatesByColumn, timestamp);
}
//////////////////////////////////////////////////////////////////////////////
// Support code
@ -1250,7 +1416,11 @@ public class HRegion implements HConstants {
}
}
/** Make sure this is a valid column for the current table */
/**
* Make sure this is a valid column for the current table
* @param columnName
* @throws IOException
*/
void checkColumn(Text columnName) throws IOException {
Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
if(! regionInfo.tableDesc.hasFamily(family)) {
@ -1359,10 +1529,6 @@ public class HRegion implements HConstants {
dataFilter.reset();
}
this.scanners = new HInternalScannerInterface[stores.length + 1];
for(int i = 0; i < this.scanners.length; i++) {
this.scanners[i] = null;
}
this.resultSets = new TreeMap[scanners.length];
this.keys = new HStoreKey[scanners.length];
this.wildcardMatch = false;
@ -1424,12 +1590,11 @@ public class HRegion implements HConstants {
public boolean isMultipleMatchScanner() {
return multipleMatchers;
}
/**
* {@inheritDoc}
*/
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
throws IOException {
// Filtered flag is set by filters. If a cell has been 'filtered out'
// -- i.e. it is not to be returned to the caller -- the flag is 'true'.
boolean filtered = true;
boolean moreToFollow = true;
while (filtered && moreToFollow) {
@ -1446,19 +1611,27 @@ public class HRegion implements HConstants {
chosenTimestamp = keys[i].getTimestamp();
}
}
// Filter whole row by row key?
filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
// Store the key and results for each sub-scanner. Merge them as
// appropriate.
if (chosenTimestamp > 0 && !filtered) {
if (chosenTimestamp >= 0 && !filtered) {
// Here we are setting the passed in key with current row+timestamp
key.setRow(chosenRow);
key.setVersion(chosenTimestamp);
key.setColumn(new Text(""));
key.setColumn(HConstants.EMPTY_TEXT);
// Keep list of deleted cell keys within this row. We need this
// because as we go through scanners, the delete record may be in an
// early scanner and then the same record with a non-delete, non-null
// value in a later. Without history of what we've seen, we'll return
// deleted values. This List should not ever grow too large since we
// are only keeping rows and columns that match those set on the
// scanner and which have delete values. If memory usage becomes a
// problem, could redo as bloom filter.
List<HStoreKey> deletes = new ArrayList<HStoreKey>();
for (int i = 0; i < scanners.length && !filtered; i++) {
while ((scanners[i] != null
&& !filtered
&& moreToFollow)
@ -1481,8 +1654,19 @@ public class HRegion implements HConstants {
// but this had the effect of overwriting newer
// values with older ones. So now we only insert
// a result if the map does not contain the key.
HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT,
key.getTimestamp());
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
if (!filtered && moreToFollow &&
hsk.setColumn(e.getKey());
if (HGlobals.deleteBytes.equals(e.getValue())) {
if (!deletes.contains(hsk)) {
// Key changes as we cycle the for loop so add a copy to
// the set of deletes.
deletes.add(new HStoreKey(hsk));
}
} else if (!deletes.contains(hsk) &&
!filtered &&
moreToFollow &&
!results.containsKey(e.getKey())) {
if (dataFilter != null) {
// Filter whole row by column data?
@ -1496,7 +1680,6 @@ public class HRegion implements HConstants {
results.put(e.getKey(), e.getValue());
}
}
resultSets[i].clear();
if (!scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
@ -1516,8 +1699,8 @@ public class HRegion implements HConstants {
}
}
}
moreToFollow = chosenTimestamp > 0;
moreToFollow = chosenTimestamp >= 0;
if (dataFilter != null) {
if (moreToFollow) {
@ -1533,6 +1716,17 @@ public class HRegion implements HConstants {
LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
}
}
if (results.size() <= 0 && !filtered) {
// There were no results found for this row. Marked it as
// 'filtered'-out otherwise we will not move on to the next row.
filtered = true;
}
}
// If we got no results, then there is no more to follow.
if (results == null || results.size() <= 0) {
moreToFollow = false;
}
// Make sure scanners closed if no more results
@ -1551,7 +1745,11 @@ public class HRegion implements HConstants {
/** Shut down a single scanner */
void closeScanner(int i) {
try {
scanners[i].close();
try {
scanners[i].close();
} catch (IOException e) {
LOG.warn("Failed closeing scanner " + i, e);
}
} finally {
scanners[i] = null;
keys[i] = null;

View File

@ -38,8 +38,8 @@ public interface HRegionInterface extends VersionedProtocol {
/**
* Get metainfo about an HRegion
*
* @param regionName - name of the region
* @return - HRegionInfo object for region
* @param regionName name of the region
* @return HRegionInfo object for region
* @throws NotServingRegionException
*/
public HRegionInfo getRegionInfo(final Text regionName)
@ -69,7 +69,7 @@ public interface HRegionInterface extends VersionedProtocol {
* @throws IOException
*/
public byte [][] get(final Text regionName, final Text row,
final Text column, final int numVersions)
final Text column, final int numVersions)
throws IOException;
/**
@ -107,7 +107,21 @@ public interface HRegionInterface extends VersionedProtocol {
* @param b BatchUpdate
* @throws IOException
*/
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException;
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException;
/**
* Delete all cells that match the passed row and column and whose
* timestamp is equal-to or older than the passed timestamp.
*
* @param regionName region name
* @param row row key
* @param column column key
* @param timestamp Delete all entries that have this timestamp or older
* @throws IOException
*/
public void deleteAll(Text regionName, Text row, Text column, long timestamp)
throws IOException;
//
// remote scanner interface

View File

@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
@ -1075,22 +1076,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
leases.renewLease(scannerId, scannerId);
// Collect values to be returned here
MapWritable values = new MapWritable();
// Keep getting rows until we find one that has at least one non-deleted column value
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
while (s.next(key, results)) {
for(Map.Entry<Text, byte []> e: results.entrySet()) {
HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
byte [] val = e.getValue();
if (HGlobals.deleteBytes.compareTo(val) == 0) {
// Column value is deleted. Don't return it.
continue;
}
values.put(k, new ImmutableBytesWritable(val));
values.put(new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()),
new ImmutableBytesWritable(e.getValue()));
}
if(values.size() > 0) {
@ -1099,7 +1091,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
// No data for this row, go get another.
results.clear();
}
return values;
@ -1110,26 +1101,46 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/** {@inheritDoc} */
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException {
throws IOException {
requestCount.incrementAndGet();
// If timestamp == LATEST_TIMESTAMP and we have deletes, then they need
// special treatment. For these we need to first find the latest cell so
// when we write the delete, we write it with the latest cells' timestamp
// so the delete record overshadows. This means deletes and puts do not
// happen within the same row lock.
List<Text> deletes = null;
try {
long lockid = startUpdate(regionName, b.getRow());
for(BatchOperation op: b) {
switch(op.getOp()) {
case BatchOperation.PUT_OP:
case PUT:
put(regionName, lockid, op.getColumn(), op.getValue());
break;
case BatchOperation.DELETE_OP:
delete(regionName, lockid, op.getColumn());
case DELETE:
if (timestamp == LATEST_TIMESTAMP) {
// Save off these deletes.
if (deletes == null) {
deletes = new ArrayList<Text>();
}
deletes.add(op.getColumn());
} else {
delete(regionName, lockid, op.getColumn());
}
break;
}
}
commit(regionName, lockid, timestamp);
commit(regionName, lockid,
(timestamp == LATEST_TIMESTAMP)? System.currentTimeMillis(): timestamp);
if (deletes != null && deletes.size() > 0) {
// We have some LATEST_TIMESTAMP deletes to run.
HRegion r = getRegion(regionName);
for (Text column: deletes) {
r.deleteMultiple(b.getRow(), column, LATEST_TIMESTAMP, 1);
}
}
} catch (IOException e) {
checkFileSystem();
throw e;
@ -1158,7 +1169,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
return scannerId;
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
@ -1217,7 +1227,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
s = scanners.remove(this.scannerName);
}
if (s != null) {
s.close();
try {
s.close();
} catch (IOException e) {
LOG.error("Closing scanner", e);
}
}
}
}
@ -1241,10 +1255,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
protected void delete(Text regionName, long lockid, Text column)
throws IOException {
HRegion region = getRegion(regionName);
region.delete(lockid, column);
}
public void deleteAll(final Text regionName, final Text row,
final Text column, final long timestamp)
throws IOException {
HRegion region = getRegion(regionName);
region.deleteAll(row, column, timestamp);
}
protected void commit(Text regionName, final long lockid,
final long timestamp) throws IOException {

View File

@ -30,9 +30,12 @@ import java.util.SortedMap;
*/
public interface HScannerInterface {
/**
* Get the next set of values
* Grab the next row's worth of values. The scanner will return the most
* recent data value for each row that is not newer than the target time
* passed when the scanner was created.
* @param key will contain the row and timestamp upon return
* @param results will contain an entry for each column family member and its value
* @param results will contain an entry for each column family member and its
* value
* @return true if data was returned
* @throws IOException
*/

View File

@ -24,6 +24,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -86,7 +89,14 @@ class HStore implements HConstants {
final HLocking lock = new HLocking();
/* Sorted Map of readers keyed by sequence id (Most recent should be last in
* in list).
*/
TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
/* Sorted Map of readers keyed by sequence id (Most recent should be last in
* in list).
*/
TreeMap<Long, MapFile.Reader> readers = new TreeMap<Long, MapFile.Reader>();
Random rand = new Random();
@ -176,7 +186,7 @@ class HStore implements HConstants {
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
// corresponding one in 'loginfodir'. Without a corresponding log info
// file, the entry in 'mapdir' must be deleted.
Vector<HStoreFile> hstoreFiles
Collection<HStoreFile> hstoreFiles
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
for(HStoreFile hsf: hstoreFiles) {
this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
@ -446,30 +456,23 @@ class HStore implements HConstants {
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
this.bloomFilter);
// hbase.hstore.compact.on.flush=true enables picking up an existing
// HStoreFIle from disk interlacing the memcache flush compacting as we
// go. The notion is that interlacing would take as long as a pure
// flush with the added benefit of having one less file in the store.
// Experiments show that it takes two to three times the amount of time
// flushing -- more column families makes it so the two timings come
// closer together -- but it also complicates the flush. Disabled for
// now. Needs work picking which file to interlace (favor references
// first, etc.)
// Here we tried picking up an existing HStoreFile from disk and
// interlacing the memcache flush compacting as we go. The notion was
// that interlacing would take as long as a pure flush with the added
// benefit of having one less file in the store. Experiments showed that
// it takes two to three times the amount of time flushing -- more column
// families makes it so the two timings come closer together -- but it
// also complicates the flush. The code was removed. Needed work picking
// which file to interlace (favor references first, etc.)
//
// Related, looks like 'merging compactions' in BigTable paper interlaces
// a memcache flush. We don't.
try {
if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) &&
this.storefiles.size() > 0) {
compact(out, inputCache.entrySet().iterator(),
this.readers.get(this.storefiles.firstKey()));
} else {
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
if (this.familyName.
equals(HStoreKey.extractFamily(curkey.getColumn()))) {
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
}
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
if (this.familyName.
equals(HStoreKey.extractFamily(curkey.getColumn()))) {
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
}
}
} finally {
@ -546,7 +549,6 @@ class HStore implements HConstants {
*
* We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
* @throws IOException
*/
void compact() throws IOException {
@ -564,6 +566,8 @@ class HStore implements HConstants {
* @param maxSeenSeqID We may have already calculated the maxSeenSeqID. If
* so, pass it here. Otherwise, pass -1 and it will be calculated inside in
* this method.
* @param deleteSequenceInfo
* @param maxSeenSeqID
* @throws IOException
*/
void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
@ -584,7 +588,7 @@ class HStore implements HConstants {
}
}
try {
Vector<HStoreFile> toCompactFiles = getFilesToCompact();
List<HStoreFile> toCompactFiles = getFilesToCompact();
HStoreFile compactedOutputFile =
new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
if (toCompactFiles.size() < 1 ||
@ -664,17 +668,21 @@ class HStore implements HConstants {
}
/*
* @return list of files to compact
* @return list of files to compact sorted so most recent comes first.
*/
private Vector<HStoreFile> getFilesToCompact() {
Vector<HStoreFile> toCompactFiles = null;
private List<HStoreFile> getFilesToCompact() {
List<HStoreFile> filesToCompact = null;
this.lock.obtainWriteLock();
try {
toCompactFiles = new Vector<HStoreFile>(storefiles.values());
// Storefiles are keyed by sequence id. The oldest file comes first.
// We need to return out of here a List that has the newest file as
// first.
filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
Collections.reverse(filesToCompact);
} finally {
this.lock.releaseWriteLock();
}
return toCompactFiles;
return filesToCompact;
}
/*
@ -694,7 +702,7 @@ class HStore implements HConstants {
* @throws IOException
*/
void compact(final MapFile.Writer compactedOut,
final Vector<HStoreFile> toCompactFiles)
final List<HStoreFile> toCompactFiles)
throws IOException {
int size = toCompactFiles.size();
CompactionReader[] rdrs = new CompactionReader[size];
@ -842,8 +850,14 @@ class HStore implements HConstants {
int timesSeen = 0;
Text lastRow = new Text();
Text lastColumn = new Text();
while(numDone < done.length) {
// Find the reader with the smallest key
// Map of a row deletes keyed by column with a list of timestamps for value
Map<Text, List<Long>> deletes = null;
while (numDone < done.length) {
// Find the reader with the smallest key. If two files have same key
// but different values -- i.e. one is delete and other is non-delete
// value -- we will find the first, the one that was written later and
// therefore the one whose value should make it out to the compacted
// store file.
int smallestKey = -1;
for(int i = 0; i < rdrs.length; i++) {
if(done[i]) {
@ -865,24 +879,23 @@ class HStore implements HConstants {
timesSeen++;
} else {
timesSeen = 1;
// We are on to a new row. Create a new deletes list.
deletes = new HashMap<Text, List<Long>>();
}
if(timesSeen <= family.getMaxVersions()) {
byte [] value = (vals[smallestKey] == null)?
null: vals[smallestKey].get();
if (!isDeleted(sk, value, null, deletes) &&
timesSeen <= family.getMaxVersions()) {
// Keep old versions until we have maxVersions worth.
// Then just skip them.
if(sk.getRow().getLength() != 0
&& sk.getColumn().getLength() != 0) {
if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) {
// Only write out objects which have a non-zero length key and
// value
compactedOut.append(sk, vals[smallestKey]);
}
}
// TODO: I don't know what to do about deleted values. I currently
// include the fact that the item was deleted as a legitimate
// "version" of the data. Maybe it should just drop the deleted
// val?
// Update last-seen items
lastRow.set(sk.getRow());
lastColumn.set(sk.getColumn());
@ -899,6 +912,52 @@ class HStore implements HConstants {
}
}
/*
* Check if this is cell is deleted.
* If a memcache and a deletes, check key does not have an entry filled.
* Otherwise, check value is not the <code>HGlobals.deleteBytes</code> value.
* If passed value IS deleteBytes, then it is added to the passed
* deletes map.
* @param hsk
* @param value
* @param memcache Can be null.
* @param deletes Map keyed by column with a value of timestamp. Can be null.
* If non-null and passed value is HGlobals.deleteBytes, then we add to this
* map.
* @return True if this is a deleted cell. Adds the passed deletes map if
* passed value is HGlobals.deleteBytes.
*/
private boolean isDeleted(final HStoreKey hsk, final byte [] value,
final HMemcache memcache, final Map<Text, List<Long>> deletes) {
if (memcache != null && memcache.isDeleted(hsk)) {
return true;
}
List<Long> timestamps = (deletes == null)?
null: deletes.get(hsk.getColumn());
if (timestamps != null &&
timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
return true;
}
if (value == null) {
// If a null value, shouldn't be in here. Mark it as deleted cell.
return true;
}
if (!HGlobals.deleteBytes.equals(value)) {
return false;
}
// Cell has delete value. Save it into deletes.
if (deletes != null) {
if (timestamps == null) {
timestamps = new ArrayList<Long>();
deletes.put(hsk.getColumn(), timestamps);
}
// We know its not already in the deletes array else we'd have returned
// earlier so no need to test if timestamps already has this value.
timestamps.add(Long.valueOf(hsk.getTimestamp()));
}
return true;
}
/*
* It's assumed that the compactLock will be acquired prior to calling this
* method! Otherwise, it is not thread-safe!
@ -1061,22 +1120,37 @@ class HStore implements HConstants {
* previous 'numVersions-1' values, as well.
*
* If 'numVersions' is negative, the method returns all available versions.
* @param key
* @param numVersions Number of versions to fetch. Must be > 0.
* @param memcache Checked for deletions
* @return
* @throws IOException
*/
byte [][] get(HStoreKey key, int numVersions) throws IOException {
byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache)
throws IOException {
if (numVersions <= 0) {
throw new IllegalArgumentException("Number of versions must be > 0");
}
List<byte []> results = new ArrayList<byte []>();
// Keep a list of deleted cell keys. We need this because as we go through
// the store files, the cell with the delete marker may be in one file and
// the old non-delete cell value in a later store file. If we don't keep
// around the fact that the cell was deleted in a newer record, we end up
// returning the old value if user is asking for more than one version.
// This List of deletes should not large since we are only keeping rows
// and columns that match those set on the scanner and which have delete
// values. If memory usage becomes an issue, could redo as bloom filter.
Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
// This code below is very close to the body of the getKeys method.
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray = getReaders();
for(int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
ImmutableBytesWritable readval = new ImmutableBytesWritable();
map.reset();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
if (readkey == null) {
// map.getClosest returns null if the passed key is > than the
@ -1085,27 +1159,31 @@ class HStore implements HConstants {
// BEFORE.
continue;
}
if (readkey.matchesRowCol(key)) {
if(readval.equals(HGlobals.deleteBytes)) {
if (!readkey.matchesRowCol(key)) {
continue;
}
if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
results.add(readval.get());
// Perhaps only one version is wanted. I could let this
// test happen later in the for loop test but it would cost
// the allocation of an ImmutableBytesWritable.
if (hasEnoughVersions(numVersions, results)) {
break;
}
results.add(readval.get());
readval = new ImmutableBytesWritable();
while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
if ((numVersions > 0 && (results.size() >= numVersions))
|| readval.equals(HGlobals.deleteBytes)) {
break;
}
}
while ((readval = new ImmutableBytesWritable()) != null &&
map.next(readkey, readval) &&
readkey.matchesRowCol(key) &&
!hasEnoughVersions(numVersions, results)) {
if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
results.add(readval.get());
readval = new ImmutableBytesWritable();
}
}
}
if(results.size() >= numVersions) {
if (hasEnoughVersions(numVersions, results)) {
break;
}
}
return results.size() == 0 ?
null : ImmutableBytesWritable.toArray(results);
} finally {
@ -1113,6 +1191,75 @@ class HStore implements HConstants {
}
}
private boolean hasEnoughVersions(final int numVersions,
final List<byte []> results) {
return numVersions > 0 && results.size() >= numVersions;
}
/**
* Get <code>versions</code> keys matching the origin key's
* row/column/timestamp and those of an older vintage
* Default access so can be accessed out of {@link HRegionServer}.
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants.ALL_VERSIONS} to retrieve all. Versions will include
* size of passed <code>allKeys</code> in its count.
* @param allKeys List of keys prepopulated by keys we found in memcache.
* This method returns this passed list with all matching keys found in
* stores appended.
* @return The passed <code>allKeys</code> with <code>versions</code> of
* matching keys found in store files appended.
* @throws IOException
*/
List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys,
final int versions)
throws IOException {
if (allKeys == null) {
allKeys = new ArrayList<HStoreKey>();
}
// This code below is very close to the body of the get method.
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray = getReaders();
for(int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
map.reset();
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval);
if (readkey == null) {
// map.getClosest returns null if the passed key is > than the
// last key in the map file. getClosest is a bit of a misnomer
// since it returns exact match or the next closest key AFTER not
// BEFORE.
continue;
}
if (!readkey.matchesRowCol(origin)) {
continue;
}
if (!isDeleted(readkey, readval.get(), null, null) &&
!allKeys.contains(readkey)) {
allKeys.add(new HStoreKey(readkey));
}
while ((readval = new ImmutableBytesWritable()) != null &&
map.next(readkey, readval) &&
readkey.matchesRowCol(origin)) {
if (!isDeleted(readkey, readval.get(), null, null) &&
!allKeys.contains(readkey)) {
allKeys.add(new HStoreKey(readkey));
if (versions != ALL_VERSIONS && allKeys.size() >= versions) {
break;
}
}
}
}
}
return allKeys;
} finally {
this.lock.releaseReadLock();
}
}
/*
* Data structure to hold result of a look at store file sizes.
*/

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -531,10 +530,11 @@ public class HTable implements HConstants {
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* Delete the value for a column.
* Deletes the cell whose row/column/commit-timestamp match those of the
* delete.
* @param lockid lock id returned from startUpdate
* @param column name of column whose value is to be deleted
*/
public void delete(long lockid, Text column) {
checkClosed();
@ -542,10 +542,60 @@ public class HTable implements HConstants {
batch.get().delete(lockid, column);
}
/**
* Delete all values for a column
*
* @param row Row to update
* @param column name of column whose value is to be deleted
* @throws IOException
*/
public void deleteAll(final Text row, final Text column) throws IOException {
deleteAll(row, column, LATEST_TIMESTAMP);
}
/**
* Delete all values for a column
*
* @param row Row to update
* @param column name of column whose value is to be deleted
* @param ts Delete all cells of the same timestamp or older.
* @throws IOException
*/
public void deleteAll(final Text row, final Text column, final long ts)
throws IOException {
checkClosed();
for(int tries = 0; tries < numRetries; tries++) {
HRegionLocation r = getRegionLocation(row);
HRegionInterface server =
connection.getHRegionConnection(r.getServerAddress());
try {
server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts);
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries == numRetries - 1) {
throw e;
}
if (LOG.isDebugEnabled()) {
LOG.debug("reloading table servers because: " + e.getMessage());
}
tableServers = connection.reloadTableServers(tableName);
}
try {
Thread.sleep(this.pause);
} catch (InterruptedException x) {
// continue
}
}
}
/**
* Abort a row mutation
*
* @param lockid - lock id returned from startUpdate
* @param lockid lock id returned from startUpdate
*/
public synchronized void abort(long lockid) {
checkClosed();
@ -558,24 +608,26 @@ public class HTable implements HConstants {
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* When this method is specified, we pass the server a value that says use
* the 'latest' timestamp. If we are doing a put, on the server-side, cells
* will be given the servers's current timestamp. If the we are commiting
* deletes, then delete removes the most recently modified cell of stipulated
* column.
* @param lockid lock id returned from startUpdate
* @throws IOException
*/
public void commit(long lockid) throws IOException {
commit(lockid, System.currentTimeMillis());
commit(lockid, LATEST_TIMESTAMP);
}
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @param timestamp - time to associate with the change
* @param lockid lock id returned from startUpdate
* @param timestamp time to associate with the change
* @throws IOException
*/
public synchronized void commit(long lockid, long timestamp)
throws IOException {
checkClosed();
updateInProgress(true);
if (batch.get().getLockid() != lockid) {

View File

@ -27,24 +27,40 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
* batch update operation
* Batch update operations such as put, delete, and deleteAll.
*/
public class BatchOperation implements Writable {
/** put operation */
public static final int PUT_OP = 1;
/** delete operation */
public static final int DELETE_OP = 2;
private int op;
/**
* Operation types.
* @see org.apache.hadoop.io.SequenceFile.Writer
*/
public static enum Operation {PUT, DELETE}
private Operation op;
private Text column;
private byte[] value;
/** default constructor used by Writable */
public BatchOperation() {
this.op = 0;
this.column = new Text();
this.value = null;
this(new Text());
}
/**
* Creates a DELETE operation
*
* @param column column name
*/
public BatchOperation(final Text column) {
this(Operation.DELETE, column, null);
}
/**
* Creates a PUT operation
*
* @param column column name
* @param value column value
*/
public BatchOperation(final Text column, final byte [] value) {
this(Operation.PUT, column, value);
}
/**
@ -53,22 +69,12 @@ public class BatchOperation implements Writable {
* @param column column name
* @param value column value
*/
public BatchOperation(Text column, byte[] value) {
this.op = PUT_OP;
public BatchOperation(final Operation operation, final Text column,
final byte[] value) {
this.op = operation;
this.column = column;
this.value = value;
}
/**
* Creates a delete operation
*
* @param column name of column to delete
*/
public BatchOperation(Text column) {
this.op = DELETE_OP;
this.column = column;
this.value = null;
}
/**
* @return the column
@ -80,8 +86,8 @@ public class BatchOperation implements Writable {
/**
* @return the operation
*/
public int getOp() {
return op;
public Operation getOp() {
return this.op;
}
/**
@ -99,9 +105,10 @@ public class BatchOperation implements Writable {
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
op = in.readInt();
int ordinal = in.readInt();
this.op = Operation.values()[ordinal];
column.readFields(in);
if(op == PUT_OP) {
if (this.op == Operation.PUT) {
value = new byte[in.readInt()];
in.readFully(value);
}
@ -111,11 +118,11 @@ public class BatchOperation implements Writable {
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
out.writeInt(op);
out.writeInt(this.op.ordinal());
column.write(out);
if(op == PUT_OP) {
if (this.op == Operation.PUT) {
out.writeInt(value.length);
out.write(value);
}
}
}
}

View File

@ -61,7 +61,7 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
*/
public BatchUpdate(long lockid) {
this.row = new Text();
this.lockid = Long.valueOf(Math.abs(lockid));
this.lockid = Math.abs(lockid);
this.operations = new ArrayList<BatchOperation>();
}
@ -97,27 +97,28 @@ public class BatchUpdate implements Writable, Iterable<BatchOperation> {
/**
* Change a value for the specified column
*
* @param lockid - lock id returned from startUpdate
* @param column - column whose value is being set
* @param val - new value for column
* @param lid lock id returned from startUpdate
* @param column column whose value is being set
* @param val new value for column
*/
public synchronized void put(final long lockid, final Text column,
public synchronized void put(final long lid, final Text column,
final byte val[]) {
if(this.lockid != lockid) {
throw new IllegalArgumentException("invalid lockid " + lockid);
if(this.lockid != lid) {
throw new IllegalArgumentException("invalid lockid " + lid);
}
operations.add(new BatchOperation(column, val));
}
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* Deletes the cell whose row/column/commit-timestamp match those of the
* delete.
* @param lid lock id returned from startUpdate
* @param column name of column whose value is to be deleted
*/
public synchronized void delete(final long lockid, final Text column) {
if(this.lockid != lockid) {
throw new IllegalArgumentException("invalid lockid " + lockid);
public synchronized void delete(final long lid, final Text column) {
if(this.lockid != lid) {
throw new IllegalArgumentException("invalid lockid " + lid);
}
operations.add(new BatchOperation(column));
}

View File

@ -153,6 +153,9 @@ public class ImmutableBytesWritable implements WritableComparable {
/** {@inheritDoc} */
@Override
public boolean equals(Object right_obj) {
if (right_obj instanceof byte []) {
return compareTo((byte [])right_obj) == 0;
}
if (right_obj instanceof ImmutableBytesWritable) {
return compareTo(right_obj) == 0;
}

View File

@ -26,6 +26,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
import org.apache.hadoop.io.Text;
/**
@ -41,6 +42,7 @@ public abstract class HBaseTestCase extends TestCase {
protected static final char LAST_CHAR = 'z';
protected static final byte [] START_KEY_BYTES =
{FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
protected static final int MAXVERSIONS = 3;
static {
StaticTestEnvironment.initialize();
@ -100,10 +102,18 @@ public abstract class HBaseTestCase extends TestCase {
}
protected HTableDescriptor createTableDescriptor(final String name) {
return createTableDescriptor(name, MAXVERSIONS);
}
protected HTableDescriptor createTableDescriptor(final String name,
final int versions) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions,
CompressionType.NONE, false, Integer.MAX_VALUE, null));
htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions,
CompressionType.NONE, false, Integer.MAX_VALUE, null));
htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions,
CompressionType.NONE, false, Integer.MAX_VALUE, null));
return htd;
}
@ -123,18 +133,18 @@ public abstract class HBaseTestCase extends TestCase {
if (startKeyBytes == null || startKeyBytes.length == 0) {
startKeyBytes = START_KEY_BYTES;
}
addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1);
addContent(new HRegionIncommon(r), column, startKeyBytes, endKey, -1);
}
/**
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
* @param updater An instance of {@link Loader}.
* @param updater An instance of {@link Incommon}.
* @param column
* @throws IOException
*/
protected static void addContent(final Loader updater, final String column)
protected static void addContent(final Incommon updater, final String column)
throws IOException {
addContent(updater, column, START_KEY_BYTES, null);
}
@ -143,13 +153,13 @@ public abstract class HBaseTestCase extends TestCase {
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
* @param updater An instance of {@link Loader}.
* @param updater An instance of {@link Incommon}.
* @param column
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @throws IOException
*/
protected static void addContent(final Loader updater, final String column,
protected static void addContent(final Incommon updater, final String column,
final byte [] startKeyBytes, final Text endKey)
throws IOException {
addContent(updater, column, startKeyBytes, endKey, -1);
@ -159,14 +169,14 @@ public abstract class HBaseTestCase extends TestCase {
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
* @param updater An instance of {@link Loader}.
* @param updater An instance of {@link Incommon}.
* @param column
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @param ts Timestamp to write the content with.
* @throws IOException
*/
protected static void addContent(final Loader updater, final String column,
protected static void addContent(final Incommon updater, final String column,
final byte [] startKeyBytes, final Text endKey, final long ts)
throws IOException {
// Add rows of three characters. The first character starts with the
@ -207,23 +217,42 @@ public abstract class HBaseTestCase extends TestCase {
}
/**
* Interface used by the addContent methods so either a HTable or a HRegion
* can be passed to the methods.
* Implementors can flushcache.
*/
public static interface Loader {
public long startBatchUpdate(final Text row) throws IOException;
public void put(long lockid, Text column, byte val[]) throws IOException;
public void commit(long lockid) throws IOException;
public void commit(long lockid, long ts) throws IOException;
public void abort(long lockid) throws IOException;
public static interface FlushCache {
public void flushcache() throws IOException;
}
/**
* A class that makes a {@link Loader} out of a {@link HRegion}
* Interface used by tests so can do common operations against an HTable
* or an HRegion.
*
* TOOD: Come up w/ a better name for this interface.
*/
public static class HRegionLoader implements Loader {
public static interface Incommon {
public byte [] get(Text row, Text column) throws IOException;
public byte [][] get(Text row, Text column, int versions)
throws IOException;
public byte [][] get(Text row, Text column, long ts, int versions)
throws IOException;
public long startBatchUpdate(final Text row) throws IOException;
public void put(long lockid, Text column, byte val[]) throws IOException;
public void delete(long lockid, Text column) throws IOException;
public void deleteAll(Text row, Text column, long ts) throws IOException;
public void commit(long lockid) throws IOException;
public void commit(long lockid, long ts) throws IOException;
public void abort(long lockid) throws IOException;
public HScannerInterface getScanner(Text [] columns, Text firstRow,
long ts)
throws IOException;
}
/**
* A class that makes a {@link Incommon} out of a {@link HRegion}
*/
public static class HRegionIncommon implements Incommon {
final HRegion region;
public HRegionLoader(final HRegion HRegion) {
public HRegionIncommon(final HRegion HRegion) {
super();
this.region = HRegion;
}
@ -231,7 +260,7 @@ public abstract class HBaseTestCase extends TestCase {
this.region.abort(lockid);
}
public void commit(long lockid) throws IOException {
this.region.commit(lockid, System.currentTimeMillis());
this.region.commit(lockid);
}
public void commit(long lockid, final long ts) throws IOException {
this.region.commit(lockid, ts);
@ -239,17 +268,38 @@ public abstract class HBaseTestCase extends TestCase {
public void put(long lockid, Text column, byte[] val) throws IOException {
this.region.put(lockid, column, val);
}
public void delete(long lockid, Text column) throws IOException {
this.region.delete(lockid, column);
}
public void deleteAll(Text row, Text column, long ts) throws IOException {
this.region.deleteAll(row, column, ts);
}
public long startBatchUpdate(Text row) throws IOException {
return this.region.startUpdate(row);
}
public HScannerInterface getScanner(Text [] columns, Text firstRow,
long ts)
throws IOException {
return this.region.getScanner(columns, firstRow, ts, null);
}
public byte[] get(Text row, Text column) throws IOException {
return this.region.get(row, column);
}
public byte[][] get(Text row, Text column, int versions) throws IOException {
return this.region.get(row, column, versions);
}
public byte[][] get(Text row, Text column, long ts, int versions)
throws IOException {
return this.region.get(row, column, ts, versions);
}
}
/**
* A class that makes a {@link Loader} out of a {@link HTable}
* A class that makes a {@link Incommon} out of a {@link HTable}
*/
public static class HTableLoader implements Loader {
public static class HTableIncommon implements Incommon {
final HTable table;
public HTableLoader(final HTable table) {
public HTableIncommon(final HTable table) {
super();
this.table = table;
}
@ -265,8 +315,30 @@ public abstract class HBaseTestCase extends TestCase {
public void put(long lockid, Text column, byte[] val) throws IOException {
this.table.put(lockid, column, val);
}
public void delete(long lockid, Text column) throws IOException {
this.table.delete(lockid, column);
}
public void deleteAll(Text row, Text column, long ts) throws IOException {
this.table.deleteAll(row, column, ts);
}
public long startBatchUpdate(Text row) {
return this.table.startUpdate(row);
}
public HScannerInterface getScanner(Text [] columns, Text firstRow,
long ts)
throws IOException {
return this.table.obtainScanner(columns, firstRow, ts, null);
}
public byte[] get(Text row, Text column) throws IOException {
return this.table.get(row, column);
}
public byte[][] get(Text row, Text column, int versions)
throws IOException {
return this.table.get(row, column, versions);
}
public byte[][] get(Text row, Text column, long ts, int versions)
throws IOException {
return this.table.get(row, column, ts, versions);
}
}
}

View File

@ -417,4 +417,15 @@ public class MiniHBaseCluster implements HConstants {
}
f.delete();
}
/**
* Call flushCache on all regions on all participating regionservers.
* @throws IOException
*/
void flushcache() throws IOException {
HRegionServer s = this.regionThreads.get(0).getRegionServer();
for(HRegion r: s.onlineRegions.values() ) {
r.flushcache(false);
}
}
}

View File

@ -54,7 +54,7 @@ public class MultiRegionTable extends HBaseTestCase {
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
int count = count(meta, HConstants.COLUMN_FAMILY_STR);
HTable t = new HTable(conf, new Text(tableName));
addContent(new HTableLoader(t), columnName);
addContent(new HTableIncommon(t), columnName);
// All is running in the one JVM so I should be able to get the single
// region instance and bring on a split.

View File

@ -23,71 +23,133 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
/**
* Test compactions
*/
public class TestCompaction extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
private HLog hlog = null;
private HRegion r = null;
private static final String COLUMN_FAMILY = COLFAMILY_NAME1;
private static final Text STARTROW = new Text(START_KEY_BYTES);
private static final Text COLUMN_FAMILY_TEXT = new Text(COLUMN_FAMILY);
private static final Text COLUMN_FAMILY_TEXT_MINUS_COLON =
new Text(COLUMN_FAMILY.substring(0, COLUMN_FAMILY.length() - 1));
private static final int COMPACTION_THRESHOLD = MAXVERSIONS;
@Override
public void setUp() throws Exception {
super.setUp();
this.hlog = new HLog(this.localFs, this.testDir, this.conf);
HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
}
@Override
public void tearDown() throws Exception {
this.r.close();
this.hlog.closeAndDelete();
super.tearDown();
}
/**
* Run compaction and flushing memcache
* Assert deletes get cleaned up.
* @throws Exception
*/
public void testCompaction() throws Exception {
HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
final HRegion r =
new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
try {
createStoreFile(r);
assertFalse(r.needsCompaction());
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
createStoreFile(r);
assertFalse(r.needsCompaction());
int compactionThreshold =
this.conf.getInt("hbase.hstore.compactionThreshold", 3);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
assertTrue(r.needsCompaction());
// Add more content. Now there are about 5 versions of each column.
// Default is that there only 3 (MAXVERSIONS) versions allowed per column.
// Assert > 3 and then after compaction, assert that only 3 versions
// available.
addContent(new HRegionIncommon(r), COLUMN_FAMILY);
byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
// Assert that I can get > 5 versions (Should be at least 5 in there).
assertTrue(bytes.length >= 5);
// Try to run compaction concurrent with a thread flush just to see that
// we can.
final HRegion region = this.r;
Thread t1 = new Thread() {
@Override
public void run() {
try {
region.flushcache(false);
} catch (IOException e) {
e.printStackTrace();
}
}
assertTrue(r.needsCompaction());
// Try to run compaction concurrent with a thread flush.
addContent(new HRegionLoader(r), COLFAMILY_NAME1);
Thread t1 = new Thread() {
@Override
public void run() {
try {
r.flushcache(false);
} catch (IOException e) {
e.printStackTrace();
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
assertTrue(region.compactStores());
} catch (IOException e) {
e.printStackTrace();
}
};
Thread t2 = new Thread() {
@Override
public void run() {
try {
assertTrue(r.compactStores());
} catch (IOException e) {
e.printStackTrace();
}
}
};
t1.setDaemon(true);
t1.start();
t2.setDaemon(true);
t2.start();
t1.join();
t2.join();
} finally {
r.close();
hlog.closeAndDelete();
}
};
t1.setDaemon(true);
t1.start();
t2.setDaemon(true);
t2.start();
t1.join();
t2.join();
// Now assert that there are 4 versions of a record only: thats the
// 3 versions that should be in the compacted store and then the one more
// we added when we compacted.
byte [] secondRowBytes = new byte[START_KEY_BYTES.length];
System.arraycopy(START_KEY_BYTES, 0, secondRowBytes, 0,
START_KEY_BYTES.length);
// Increment the least significant character so we get to next row.
secondRowBytes[START_KEY_BYTES.length - 1]++;
Text secondRow = new Text(secondRowBytes);
bytes = this.r.get(secondRow, COLUMN_FAMILY_TEXT, 100/*Too many*/);
assertTrue(bytes.length == 4);
// Now add deletes to memcache and then flush it. That will put us over
// the compaction threshold of 3 store files. Compacting these store files
// should result in a compacted store file that has no references to the
// deleted row.
this.r.deleteAll(STARTROW, COLUMN_FAMILY_TEXT, System.currentTimeMillis());
// Now, before compacting, remove all instances of the first row so can
// verify that it is removed as we compact.
// Assert all delted.
assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
this.r.flushcache(false);
assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
assertTrue(this.r.needsCompaction());
this.r.compactStores();
// Assert that the first row is still deleted.
bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
assertNull(bytes);
// Assert the store files do not have the first record 'aaa' keys in them.
for (MapFile.Reader reader:
this.r.stores.get(COLUMN_FAMILY_TEXT_MINUS_COLON).readers.values()) {
reader.reset();
HStoreKey key = new HStoreKey();
ImmutableBytesWritable val = new ImmutableBytesWritable();
while(reader.next(key, val)) {
assertFalse(key.getRow().equals(STARTROW));
}
}
}
private void createStoreFile(final HRegion r) throws IOException {
HRegionLoader loader = new HRegionLoader(r);
for (int i = 0; i < 3; i++) {
addContent(loader, COLFAMILY_NAME1);
private void createStoreFile(final HRegion region) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
for (int i = 0; i < 1; i++) {
addContent(loader, COLUMN_FAMILY);
}
r.flushcache(false);
region.flushcache(false);
}
}
}

View File

@ -45,32 +45,27 @@ public class TestMasterAdmin extends HBaseClusterTestCase {
admin.disableTable(testDesc.getName());
try {
try {
@SuppressWarnings("unused")
HTable table = new HTable(conf, testDesc.getName());
@SuppressWarnings("unused")
HTable table = new HTable(conf, testDesc.getName());
} catch(IllegalStateException e) {
// Expected
}
admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:"));
admin.enableTable(testDesc.getName());
try {
admin.deleteColumn(testDesc.getName(), new Text("col2:"));
} catch(TableNotDisabledException e) {
// Expected
}
admin.disableTable(testDesc.getName());
admin.deleteColumn(testDesc.getName(), new Text("col2:"));
} catch(IllegalStateException e) {
// Expected
} catch(Exception e) {
e.printStackTrace();
fail();
} finally {
admin.deleteTable(testDesc.getName());
// This exception is not actually thrown. It doesn't look like it should
// thrown since the connection manager is already filled w/ data
// -- noticed by St.Ack 09/09/2007
}
admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:"));
admin.enableTable(testDesc.getName());
try {
admin.deleteColumn(testDesc.getName(), new Text("col2:"));
} catch(TableNotDisabledException e) {
// Expected
}
admin.disableTable(testDesc.getName());
admin.deleteColumn(testDesc.getName(), new Text("col2:"));
admin.deleteTable(testDesc.getName());
}
}

View File

@ -172,11 +172,12 @@ public class TestScanner2 extends HBaseClusterTestCase {
* @throws IOException
*/
public void testSplitDeleteOneAddTwoRegions() throws IOException {
HTable metaTable = new HTable(conf, HConstants.META_TABLE_NAME);
// First add a new table. Its intial region will be added to META region.
HBaseAdmin admin = new HBaseAdmin(conf);
Text tableName = new Text(getName());
admin.createTable(new HTableDescriptor(tableName.toString()));
List<HRegionInfo> regions = scan(conf, HConstants.META_TABLE_NAME);
List<HRegionInfo> regions = scan(conf, metaTable);
assertEquals("Expected one region", regions.size(), 1);
HRegionInfo region = regions.get(0);
assertTrue("Expected region named for test",
@ -196,10 +197,10 @@ public class TestScanner2 extends HBaseClusterTestCase {
homedir, this.conf, null));
try {
for (HRegion r : newRegions) {
addRegionToMETA(conf, HConstants.META_TABLE_NAME, r,
this.cluster.getHMasterAddress(), -1L);
addRegionToMETA(conf, metaTable, r, this.cluster.getHMasterAddress(),
-1L);
}
regions = scan(conf, HConstants.META_TABLE_NAME);
regions = scan(conf, metaTable);
assertEquals("Should be two regions only", 2, regions.size());
} finally {
for (HRegion r : newRegions) {
@ -209,14 +210,13 @@ public class TestScanner2 extends HBaseClusterTestCase {
}
}
private List<HRegionInfo> scan(final Configuration conf, final Text table)
private List<HRegionInfo> scan(final Configuration conf, final HTable t)
throws IOException {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
HRegionInterface regionServer = null;
long scannerId = -1L;
try {
HTable t = new HTable(conf, table);
HRegionLocation rl = t.getRegionLocation(table);
HRegionLocation rl = t.getRegionLocation(t.getTableName());
regionServer = t.getConnection().getHRegionConnection(rl.getServerAddress());
scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(),
HConstants.COLUMN_FAMILY_ARRAY, new Text(),
@ -263,25 +263,24 @@ public class TestScanner2 extends HBaseClusterTestCase {
}
private void addRegionToMETA(final Configuration conf,
final Text table, final HRegion region,
final HTable t, final HRegion region,
final HServerAddress serverAddress,
final long startCode)
throws IOException {
HTable t = new HTable(conf, table);
try {
long lockid = t.startUpdate(region.getRegionName());
t.put(lockid, HConstants.COL_REGIONINFO,
Writables.getBytes(region.getRegionInfo()));
t.put(lockid, HConstants.COL_SERVER,
Writables.stringToBytes(serverAddress.toString()));
t.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(startCode));
t.commit(lockid);
if (LOG.isDebugEnabled()) {
LOG.info("Added region " + region.getRegionName() + " to table " +
table);
}
} finally {
t.close();
long lockid = t.startUpdate(region.getRegionName());
t.put(lockid, HConstants.COL_REGIONINFO,
Writables.getBytes(region.getRegionInfo()));
t.put(lockid, HConstants.COL_SERVER,
Writables.stringToBytes(serverAddress.toString()));
t.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(startCode));
t.commit(lockid);
// Assert added.
byte [] bytes = t.get(region.getRegionName(), HConstants.COL_REGIONINFO);
HRegionInfo hri = Writables.getHRegionInfo(bytes);
assertEquals(hri.getRegionId(), region.getRegionId());
if (LOG.isDebugEnabled()) {
LOG.info("Added region " + region.getRegionName() + " to table " +
t.getTableName());
}
}

View File

@ -21,10 +21,14 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
/**
* Tests user specifiable time stamps
* Tests user specifiable time stamps putting, getting and scanning. Also
* tests same in presence of deletes. Test cores are written so can be
* run against an HRegion and against an HTable: i.e. both local and remote.
*/
public class TestTimestamp extends HBaseTestCase {
private static final long T0 = 10L;
@ -32,74 +36,196 @@ public class TestTimestamp extends HBaseTestCase {
private static final long T2 = 200L;
private static final String COLUMN_NAME = "contents:";
private static final String TABLE_NAME = "test";
private static final String VERSION1 = "version1";
private static final String LATEST = "latest";
private static final Text COLUMN = new Text(COLUMN_NAME);
private static final Text[] COLUMNS = {
COLUMN
};
private static final Text TABLE = new Text(TABLE_NAME);
private static final Text[] COLUMNS = {COLUMN};
private static final Text ROW = new Text("row");
// When creating column descriptor, how many versions of a cell to allow.
private static final int VERSIONS = 3;
/**
* Test that delete works according to description in <a
* href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>
* when it comes to timestamps.
* href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>.
* @throws IOException
*/
public void testDelete() throws IOException {
HRegion r = createRegion();
final HRegion r = createRegion();
try {
HRegionLoader loader = new HRegionLoader(r);
// Add a couple of values for three different timestamps.
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T2);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
// If I delete w/o specifying a timestamp, this means I'm deleting the
// latest.
delete(r, System.currentTimeMillis());
// Verify that I get back T2 through T0.
doTestDelete(new HRegionIncommon(r), new FlushCache() {
public void flushcache() throws IOException {
r.flushcache(false);
}
});
} finally {
r.close();
r.getLog().closeAndDelete();
}
}
private void delete(final HRegion r, final long ts) throws IOException {
long lockid = r.startUpdate(ROW);
r.delete(lockid, COLUMN);
r.commit(lockid, ts == -1? System.currentTimeMillis(): ts);
}
/**
* Test scanning against different timestamps.
* @throws IOException
*/
public void testTimestampScanning() throws IOException {
HRegion r = createRegion();
final HRegion r = createRegion();
try {
HRegionLoader loader = new HRegionLoader(r);
// Add a couple of values for three different timestamps.
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
// Get count of latest items.
int count = assertScanContentTimestamp(r, System.currentTimeMillis());
// Assert I get same count when I scan at each timestamp.
assertEquals(count, assertScanContentTimestamp(r, T0));
assertEquals(count, assertScanContentTimestamp(r, T1));
// Flush everything out to disk and then retry
r.flushcache(false);
assertEquals(count, assertScanContentTimestamp(r, T0));
assertEquals(count, assertScanContentTimestamp(r, T1));
doTestTimestampScanning(new HRegionIncommon(r), new FlushCache() {
public void flushcache() throws IOException {
r.flushcache(false);
}
});
} finally {
r.close();
r.getLog().closeAndDelete();
}
}
/**
* Basic test of timestamps.
* Do the above tests from client side.
* @throws IOException
*/
public void testTimestamps() throws IOException {
final MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
try {
HTable t = createTable();
Incommon incommon = new HTableIncommon(t);
doTestDelete(incommon, new FlushCache() {
public void flushcache() throws IOException {
cluster.flushcache();
}
});
// Perhaps drop and readd the table between tests so the former does
// not pollute this latter? Or put into separate tests.
doTestTimestampScanning(incommon, new FlushCache() {
public void flushcache() throws IOException {
cluster.flushcache();
}
});
} catch (Exception e) {
cluster.shutdown();
}
}
/*
* Run test that delete works according to description in <a
* href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>.
* @param incommon
* @param flusher
* @throws IOException
*/
private void doTestDelete(final Incommon incommon, FlushCache flusher)
throws IOException {
// Add values at various timestamps (Values are timestampes as bytes).
put(incommon, T0);
put(incommon, T1);
put(incommon, T2);
put(incommon);
// Verify that returned versions match passed timestamps.
assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T2, T1});
// If I delete w/o specifying a timestamp, this means I'm deleting the
// latest.
delete(incommon);
// Verify that I get back T2 through T1 -- that the latest version has
// been deleted.
assertVersions(incommon, new long [] {T2, T1, T0});
// Flush everything out to disk and then retry
flusher.flushcache();
assertVersions(incommon, new long [] {T2, T1, T0});
// Now add, back a latest so I can test remove other than the latest.
put(incommon);
assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T2, T1});
delete(incommon, T2);
assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T1, T0});
// Flush everything out to disk and then retry
flusher.flushcache();
assertVersions(incommon, new long [] {HConstants.LATEST_TIMESTAMP, T1, T0});
// Now try deleting all from T2 back inclusive (We first need to add T2
// back into the mix and to make things a little interesting, delete and
// then readd T1.
put(incommon, T2);
delete(incommon, T1);
put(incommon, T1);
incommon.deleteAll(ROW, COLUMN, T2);
// Should only be current value in set. Assert this is so
assertOnlyLatest(incommon, HConstants.LATEST_TIMESTAMP);
// Flush everything out to disk and then redo above tests
flusher.flushcache();
assertOnlyLatest(incommon, HConstants.LATEST_TIMESTAMP);
}
private void assertOnlyLatest(final Incommon incommon,
final long currentTime)
throws IOException {
byte [][] bytesBytes = incommon.get(ROW, COLUMN, 3/*Ask for too much*/);
assertEquals(1, bytesBytes.length);
long time = Writables.bytesToLong(bytesBytes[0]);
assertEquals(time, currentTime);
assertNull(incommon.get(ROW, COLUMN, T1, 3 /*Too many*/));
assertTrue(assertScanContentTimestamp(incommon, T1) == 0);
}
/*
* Assert that returned versions match passed in timestamps and that results
* are returned in the right order. Assert that values when converted to
* longs match the corresponding passed timestamp.
* @param r
* @param tss
* @throws IOException
*/
private void assertVersions(final Incommon incommon, final long [] tss)
throws IOException {
// Assert that 'latest' is what we expect.
byte [] bytes = incommon.get(ROW, COLUMN);
assertEquals(Writables.bytesToLong(bytes), tss[0]);
// Now assert that if we ask for multiple versions, that they come out in
// order.
byte [][] bytesBytes = incommon.get(ROW, COLUMN, tss.length);
assertEquals(bytesBytes.length, tss.length);
for (int i = 0; i < bytesBytes.length; i++) {
long ts = Writables.bytesToLong(bytesBytes[i]);
assertEquals(ts, tss[i]);
}
// Specify a timestamp get multiple versions.
bytesBytes = incommon.get(ROW, COLUMN, tss[0], bytesBytes.length - 1);
for (int i = 1; i < bytesBytes.length; i++) {
long ts = Writables.bytesToLong(bytesBytes[i]);
assertEquals(ts, tss[i]);
}
// Test scanner returns expected version
assertScanContentTimestamp(incommon, tss[0]);
}
/*
* Run test scanning different timestamps.
* @param incommon
* @param flusher
* @throws IOException
*/
private void doTestTimestampScanning(final Incommon incommon,
final FlushCache flusher)
throws IOException {
// Add a couple of values for three different timestamps.
put(incommon, T0);
put(incommon, T1);
put(incommon, HConstants.LATEST_TIMESTAMP);
// Get count of latest items.
int count = assertScanContentTimestamp(incommon,
HConstants.LATEST_TIMESTAMP);
// Assert I get same count when I scan at each timestamp.
assertEquals(count, assertScanContentTimestamp(incommon, T0));
assertEquals(count, assertScanContentTimestamp(incommon, T1));
// Flush everything out to disk and then retry
flusher.flushcache();
assertEquals(count, assertScanContentTimestamp(incommon, T0));
assertEquals(count, assertScanContentTimestamp(incommon, T1));
}
/*
* Assert that the scan returns only values < timestamp.
@ -108,19 +234,21 @@ public class TestTimestamp extends HBaseTestCase {
* @return Count of items scanned.
* @throws IOException
*/
private int assertScanContentTimestamp(final HRegion r, final long ts)
private int assertScanContentTimestamp(final Incommon in, final long ts)
throws IOException {
HScannerInterface scanner =
in.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts);
int count = 0;
HInternalScannerInterface scanner =
r.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts, null);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []>value = new TreeMap<Text, byte[]>();
while (scanner.next(key, value)) {
assertTrue(key.getTimestamp() <= ts);
Text row = key.getRow();
assertEquals(row.toString(),
new String(value.get(COLUMN), HConstants.UTF8_ENCODING));
// Content matches the key or HConstants.LATEST_TIMESTAMP.
// (Key does not match content if we 'put' with LATEST_TIMESTAMP).
long l = Writables.bytesToLong(value.get(COLUMN));
assertTrue(key.getTimestamp() == l ||
HConstants.LATEST_TIMESTAMP == l);
count++;
value.clear();
}
@ -129,118 +257,48 @@ public class TestTimestamp extends HBaseTestCase {
}
return count;
}
/**
* Basic test of timestamps.
* TODO: Needs rewrite after hadoop-1784 gets fixed.
* @throws IOException
*/
public void testTimestamps() throws IOException {
MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
try {
HTable table = createTable();
// store a value specifying an update time
put(table, VERSION1.getBytes(HConstants.UTF8_ENCODING), T0);
// store a value specifying 'now' as the update time
put(table, LATEST.getBytes(HConstants.UTF8_ENCODING), -1);
// delete values older than T1
long lockid = table.startUpdate(ROW);
table.delete(lockid, COLUMN);
table.commit(lockid, T1);
// now retrieve...
assertGets(table);
// flush everything out to disk
HRegionServer s = cluster.regionThreads.get(0).getRegionServer();
for(HRegion r: s.onlineRegions.values() ) {
r.flushcache(false);
}
// now retrieve...
assertGets(table);
// Test scanners
assertScanCount(table, -1, 1);
assertScanCount(table, T1, 0);
} catch (Exception e) {
cluster.shutdown();
}
}
/*
* Test count of results scanning.
* @param table
* @param ts
* @param expectedCount
* @throws IOException
*/
private void assertScanCount(final HTable table, final long ts,
final int expectedCount)
private void put(final Incommon loader, final long ts)
throws IOException {
HScannerInterface scanner = (ts == -1)?
table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW):
table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts);
try {
HStoreKey key = new HStoreKey();
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
int count = 0;
while(scanner.next(key, results)) {
count++;
}
assertEquals(count, expectedCount);
assertEquals(results.size(), expectedCount);
} finally {
scanner.close();
}
put(loader, Writables.longToBytes(ts), ts);
}
private void put(final Incommon loader)
throws IOException {
long ts = HConstants.LATEST_TIMESTAMP;
put(loader, Writables.longToBytes(ts), ts);
}
/*
* Test can do basic gets.
* Used by testTimestamp above.
* @param table
* @throws IOException
*/
private void assertGets(final HTable table) throws IOException {
// the most recent version:
byte[] bytes = table.get(ROW, COLUMN);
assertTrue(bytes != null && bytes.length != 0);
assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
// any version <= time T1
byte[][] values = table.get(ROW, COLUMN, T1, 3);
assertNull(values);
// the version from T0
values = table.get(ROW, COLUMN, T0, 3);
assertTrue(values.length == 1
&& VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
// three versions older than now
values = table.get(ROW, COLUMN, 3);
assertTrue(values.length == 1
&& LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
}
/*
* Put values.
* @param table
* Put values.
* @param loader
* @param bytes
* @param ts
* @throws IOException
*/
private void put(final HTable table, final byte [] bytes, final long ts)
private void put(final Incommon loader, final byte [] bytes,
final long ts)
throws IOException {
long lockid = table.startUpdate(ROW);
table.put(lockid, COLUMN, bytes);
if (ts == -1) {
table.commit(lockid);
long lockid = loader.startBatchUpdate(ROW);
loader.put(lockid, COLUMN, bytes);
if (ts == HConstants.LATEST_TIMESTAMP) {
loader.commit(lockid);
} else {
table.commit(lockid, ts);
loader.commit(lockid, ts);
}
}
private void delete(final Incommon loader) throws IOException {
delete(loader, HConstants.LATEST_TIMESTAMP);
}
private void delete(final Incommon loader, final long ts) throws IOException {
long lockid = loader.startBatchUpdate(ROW);
loader.delete(lockid, COLUMN);
if (ts == HConstants.LATEST_TIMESTAMP) {
loader.commit(lockid);
} else {
loader.commit(lockid, ts);
}
}
@ -250,17 +308,18 @@ public class TestTimestamp extends HBaseTestCase {
* @throws IOException
*/
private HTable createTable() throws IOException {
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
HTableDescriptor desc = new HTableDescriptor(getName());
desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
return new HTable(conf, TABLE);
return new HTable(conf, new Text(getName()));
}
private HRegion createRegion() throws IOException {
HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
HTableDescriptor htd = createTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(COLUMN_NAME));
htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
CompressionType.NONE, false, Integer.MAX_VALUE, null));
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
}