HBASE-2248 Provide new non-copy mechanism to assure atomic reads in get and scan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@944529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-05-15 00:18:37 +00:00
parent e593f0efbf
commit a026befba3
19 changed files with 1163 additions and 446 deletions

View File

@ -201,6 +201,23 @@ public class KeyValue implements Writable, HeapSize {
private int offset = 0;
private int length = 0;
/** Here be dragons **/
// used to achieve atomic operations in the memstore.
public long getMemstoreTS() {
return memstoreTS;
}
public void setMemstoreTS(long memstoreTS) {
this.memstoreTS = memstoreTS;
}
// default value is 0, aka DNC
private long memstoreTS = 0;
/** Dragon time over, return to normal business */
/** Writable Constructor -- DO NOT USE */
public KeyValue() {}
@ -1468,6 +1485,21 @@ public class KeyValue implements Writable, HeapSize {
}
/**
* Creates a KeyValue that is last on the specified row id. That is,
* every other possible KeyValue for the given row would compareTo()
* less than the result of this call.
* @param row row key
* @return Last possible KeyValue on passed <code>row</code>
*/
public static KeyValue createLastOnRow(final byte[] row) {
return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
}
/**
* Create a KeyValue that is smaller than all other possible KeyValues
* for the given row. That is any (valid) KeyValue on 'row' would sort
* _after_ the result.
*
* @param row - row key (arbitrary byte array)
* @return First possible KeyValue on passed <code>row</code>
*/
@ -1476,6 +1508,8 @@ public class KeyValue implements Writable, HeapSize {
}
/**
* Creates a KeyValue that is smaller than all other KeyValues that
* are older than the passed timestamp.
* @param row - row key (arbitrary byte array)
* @param ts - timestamp
* @return First possible key on passed <code>row</code> and timestamp.
@ -1487,8 +1521,11 @@ public class KeyValue implements Writable, HeapSize {
/**
* @param row - row key (arbitrary byte array)
* @param c column - {@link #parseColumn(byte[])} is called to split
* the column.
* @param ts - timestamp
* @return First possible key on passed <code>row</code>, column and timestamp
* @deprecated
*/
public static KeyValue createFirstOnRow(final byte [] row, final byte [] c,
final long ts) {
@ -1497,14 +1534,17 @@ public class KeyValue implements Writable, HeapSize {
}
/**
* Create a KeyValue for the specified row, family and qualifier that would be
* smaller than all other possible KeyValues that have the same row,family,qualifier.
* Used for seeking.
* @param row - row key (arbitrary byte array)
* @param f - family name
* @param q - column qualifier
* @param family - family name
* @param qualifier - column qualifier
* @return First possible key on passed <code>row</code>, and column.
*/
public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
final byte [] q) {
return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum);
public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
final byte [] qualifier) {
return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
}
/**
@ -1689,9 +1729,6 @@ public class KeyValue implements Writable, HeapSize {
return compare;
}
// if row matches, and no column in the 'left' AND put type is 'minimum',
// then return that left is larger than right.
// Compare column family. Start compare past row and family length.
int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
@ -1700,17 +1737,25 @@ public class KeyValue implements Writable, HeapSize {
int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
(rcolumnoffset - roffset);
// if row matches, and no column in the 'left' AND put type is 'minimum',
// then return that left is larger than right.
// This supports 'last key on a row' - the magic is if there is no column in the
// left operand, and the left operand has a type of '0' - magical value,
// then we say the left is bigger. This will let us seek to the last key in
// a row.
byte ltype = left[loffset + (llength - 1)];
byte rtype = right[roffset + (rlength - 1)];
if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
return 1; // left is bigger.
}
if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
return -1;
}
// TODO the family and qualifier should be compared separately
compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right,
rcolumnoffset, rcolumnlength);
if (compare != 0) {
@ -1732,9 +1777,6 @@ public class KeyValue implements Writable, HeapSize {
if (!this.ignoreType) {
// Compare types. Let the delete types sort ahead of puts; i.e. types
// of higher numbers sort before those of lesser numbers
// ltype is defined above
byte rtype = right[roffset + (rlength - 1)];
return (0xff & rtype) - (0xff & ltype);
}
return 0;
@ -1772,9 +1814,10 @@ public class KeyValue implements Writable, HeapSize {
// HeapSize
public long heapSize() {
return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
ClassSize.align(ClassSize.ARRAY + length) +
(2 * Bytes.SIZEOF_INT));
return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
ClassSize.align(ClassSize.ARRAY + length) +
(2 * Bytes.SIZEOF_INT) +
Bytes.SIZEOF_LONG);
}
// this overload assumes that the length bytes have already been read,

View File

@ -149,6 +149,24 @@ public class Scan implements Writable {
}
}
/**
* Builds a scan object with the same specs as get.
* @param get get to model scan after
*/
public Scan(Get get) {
this.startRow = get.getRow();
this.stopRow = get.getRow();
this.filter = get.getFilter();
this.maxVersions = get.getMaxVersions();
this.tr = get.getTimeRange();
this.familyMap = get.getFamilyMap();
}
public boolean isGetScan() {
return this.startRow != null && this.startRow.length > 0 &&
Bytes.equals(this.startRow, this.stopRow);
}
/**
* Get all columns from the specified family.
* <p>

View File

@ -177,9 +177,6 @@ public class SingleColumnValueFilter implements Filter {
// byte array copy?
int compareResult =
this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length));
if (LOG.isDebugEnabled()) {
LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length));
}
switch (this.compareOp) {
case LESS:
return compareResult <= 0;

View File

@ -0,0 +1,50 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class DebugPrint {
private static final AtomicBoolean enabled = new AtomicBoolean(false);
private static final Object sync = new Object();
public static StringBuilder out = new StringBuilder();
static public void enable() {
enabled.set(true);
}
static public void disable() {
enabled.set(false);
}
static public void reset() {
synchronized (sync) {
enable(); // someone wants us enabled basically.
out = new StringBuilder();
}
}
static public void dumpToFile(String file) throws IOException {
FileWriter f = new FileWriter(file);
synchronized (sync) {
f.write(out.toString());
}
f.close();
}
public static void println(String m) {
if (!enabled.get()) {
System.out.println(m);
return;
}
synchronized (sync) {
String threadName = Thread.currentThread().getName();
out.append("<");
out.append(threadName);
out.append("> ");
out.append(m);
out.append("\n");
}
}
}

View File

@ -54,22 +54,27 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@ -175,6 +180,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* Set flags that make this region read-only.
*
* @param onOff flip value for region r/o setting
*/
synchronized void setReadOnly(final boolean onOff) {
this.writesEnabled = !onOff;
@ -190,7 +197,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
private volatile WriteState writestate = new WriteState();
private final WriteState writestate = new WriteState();
final long memstoreFlushSize;
private volatile long lastFlushTime;
@ -210,6 +217,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private long minSequenceId;
private boolean splitRequest;
private final ReadWriteConsistencyControl rwcc =
new ReadWriteConsistencyControl();
/**
* Name of the region info file that resides just under the region directory.
*/
@ -296,10 +306,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* Initialize this region and get it ready to roll.
* Called after construction.
*
* @param initialFiles
* @param reporter
* @throws IOException
*
* @param initialFiles path
* @param reporter progressable
* @throws IOException e
*/
public void initialize(Path initialFiles, final Progressable reporter)
throws IOException {
@ -437,6 +447,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return this.closing.get();
}
public ReadWriteConsistencyControl getRWCC() {
return rwcc;
}
/**
* Close down this HRegion. Flush the cache, shut down each HStore, don't
* service any more calls.
@ -447,8 +461,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of all HStoreFile objects. Returns empty
* vector if already closed and null if judged that it should not close.
*
* @throws IOException
*
* @throws IOException e
*/
public List<StoreFile> close() throws IOException {
return close(false);
@ -465,8 +479,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of HStoreFile objects. Can be null if
* we are not to close at this time or we are already closed.
*
* @throws IOException
*
* @throws IOException e
*/
public List<StoreFile> close(final boolean abort) throws IOException {
if (isClosed()) {
@ -598,6 +612,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
/** @return the last time the region was flushed */
@SuppressWarnings({"UnusedDeclaration"})
public long getLastFlushTime() {
return this.lastFlushTime;
}
@ -699,8 +714,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
HRegion regions[] = new HRegion [] {regionA, regionB};
return regions;
return new HRegion [] {regionA, regionB};
}
}
@ -774,7 +788,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* server does them sequentially and not in parallel.
*
* @return mid key if split is needed
* @throws IOException
* @throws IOException e
*/
public byte [] compactStores() throws IOException {
boolean majorCompaction = this.forceMajorCompaction;
@ -795,7 +809,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*
* @param majorCompaction True to force a major compaction regardless of thresholds
* @return split row if split is needed
* @throws IOException
* @throws IOException e
*/
byte [] compactStores(final boolean majorCompaction)
throws IOException {
@ -863,8 +877,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* time-sensitive thread.
*
* @return true if cache was flushed
*
* @throws IOException
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
@ -929,8 +943,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* <p> This method may block for some time.
*
* @return true if the region needs compacting
*
* @throws IOException
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
@ -958,10 +972,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// during the flush
long sequenceId = -1L;
long completeSequenceId = -1L;
// we have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
this.updatesLock.writeLock().lock();
// Get current size of memstores.
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
sequenceId = log.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
@ -970,6 +987,13 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
}
// This thread is going to cause a whole bunch of scanners to reseek.
// They are depending
// on a thread-local to know where to read from.
// The reason why we set it up high is so that each HRegionScanner only
// has a single read point for all its sub-StoreScanners.
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
// prepare flush (take a snapshot)
for (StoreFlusher flusher : storeFlushers) {
flusher.prepare();
@ -978,6 +1002,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
this.updatesLock.writeLock().unlock();
}
LOG.debug("Finished snapshotting, commencing flushing stores");
// Any failure from here on out will be catastrophic requiring server
// restart so hlog content can be replayed and put back into the memstore.
// Otherwise, the snapshot content while backed up in the hlog, it will not
@ -992,15 +1018,37 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
flusher.flushCache();
}
internalPreFlushcacheCommit();
Callable<Void> atomicWork = internalPreFlushcacheCommit();
/*
* Switch between memstore and the new store file(s).
LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
/**
* Switch between memstore(snapshot) and the new store file
*/
for (StoreFlusher flusher : storeFlushers) {
boolean needsCompaction = flusher.commit();
if (needsCompaction) {
compactionRequested = true;
if (atomicWork != null) {
LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
newScannerLock.writeLock().lock();
}
try {
// update this again to make sure we are 'fresh'
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
if (atomicWork != null) {
atomicWork.call();
}
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
for (StoreFlusher flusher : storeFlushers) {
boolean needsCompaction = flusher.commit();
if (needsCompaction) {
compactionRequested = true;
}
}
} finally {
if (atomicWork != null) {
newScannerLock.writeLock().unlock();
}
}
@ -1049,16 +1097,22 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return compactionRequested;
}
/**
* A hook for sub-classes wishing to perform operations prior to the
* cache flush commit stage.
*
* @throws IOException allow children to throw exception
*/
protected void internalPreFlushcacheCommit() throws IOException {
}
/**
/**
* A hook for sub classed wishing to perform operations prior to the cache
* flush commit stage.
*
* If a subclass wishes that an atomic update of their work and the
* flush commit stage happens, they should return a callable. The new scanner
* lock will be acquired and released.
* @throws java.io.IOException allow children to throw exception
*/
protected Callable<Void> internalPreFlushcacheCommit() throws IOException {
return null;
}
/**
* Get the sequence number to be associated with this cache flush. Used by
* TransactionalRegion to not complete pending transactions.
*
@ -1093,9 +1147,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* <i>ts</i>.
*
* @param row row key
* @param family
* @param family column family to find on
* @return map of values
* @throws IOException
* @throws IOException read exceptions
*/
public Result getClosestRowBefore(final byte [] row, final byte [] family)
throws IOException {
@ -1112,11 +1166,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (key == null) {
return null;
}
// This will get all results for this store. TODO: Do we need to do this?
Get get = new Get(key.getRow());
List<KeyValue> results = new ArrayList<KeyValue>();
store.get(get, null, results);
return new Result(results);
get.addFamily(family);
return get(get, null);
} finally {
splitsAndClosesLock.readLock().unlock();
}
@ -1130,7 +1182,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*
* @param scan configured {@link Scan}
* @return InternalScanner
* @throws IOException
* @throws IOException read exceptions
*/
public InternalScanner getScanner(Scan scan)
throws IOException {
@ -1160,8 +1212,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
protected InternalScanner instantiateInternalScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
return new RegionScanner(scan, additionalScanners);
}
@ -1169,15 +1220,16 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
/**
* @param delete
* @param lockid
* @param writeToWAL
* @throws IOException
* @param delete delete object
* @param lockid existing lock id, or null for grab a lock
* @param writeToWAL append to the write ahead lock or not
* @throws IOException read exceptions
*/
public void delete(Delete delete, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
checkResources();
Integer lid = null;
splitsAndClosesLock.readLock().lock();
Integer lid = null;
try {
@ -1185,7 +1237,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// If we did not pass an existing row lock, obtain a new one
lid = getLock(lockid, row);
//Check to see if this is a deleteRow insert
// Check to see if this is a deleteRow insert
if(delete.getFamilyMap().isEmpty()){
for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
// Don't eat the timestamp
@ -1220,7 +1272,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
long now = System.currentTimeMillis();
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
this.updatesLock.readLock().lock();
updatesLock.readLock().lock();
ReadWriteConsistencyControl.WriteEntry w = null;
try {
@ -1237,21 +1291,21 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
byte[] qual = kv.getQualifier();
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
Integer count = kvCount.get(qual);
if (count == null) {
kvCount.put(qual, new Integer(1));
kvCount.put(qual, 1);
} else {
kvCount.put(qual, new Integer(count+1));
kvCount.put(qual, count + 1);
}
count = kvCount.get(qual);
List<KeyValue> result = new ArrayList<KeyValue>(1);
Get g = new Get(kv.getRow());
g.setMaxVersions(count);
NavigableSet<byte []> qualifiers =
new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
qualifiers.add(qual);
get(store, g, qualifiers, result);
Get get = new Get(kv.getRow());
get.setMaxVersions(count);
get.addColumn(family, qual);
List<KeyValue> result = get(get);
if (result.size() < count) {
// Nothing to delete
kv.updateLatestStamp(byteNow);
@ -1294,11 +1348,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
long size = 0;
//
// Now make changes to the memstore.
//
long size = 0;
w = rwcc.beginMemstoreInsert();
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@ -1306,13 +1360,17 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Store store = getStore(family);
for (KeyValue kv: kvs) {
kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.delete(kv));
}
}
flush = isFlushSize(size);
} finally {
if (w != null) rwcc.completeMemstoreInsert(w);
this.updatesLock.readLock().unlock();
}
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@ -1361,6 +1419,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// will be extremely rare; we'll deal with it when it happens.
checkResources();
splitsAndClosesLock.readLock().lock();
try {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
@ -1370,6 +1429,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
byte [] row = put.getRow();
// If we did not pass an existing row lock, obtain a new one
Integer lid = getLock(lockid, row);
byte [] now = Bytes.toBytes(System.currentTimeMillis());
try {
// All edits for the given row (across all column families) must happen atomically.
@ -1418,15 +1478,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Integer lid = getLock(lockId, get.getRow());
List<KeyValue> result = new ArrayList<KeyValue>();
try {
//Getting data
for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
get.getFamilyMap().entrySet()) {
get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
}
result = get(get);
boolean matches = false;
if (result.size() == 0 && expectedValue.length == 0) {
matches = true;
} else if(result.size() == 1) {
} else if (result.size() == 1) {
//Compare the expected value with the actual value
byte [] actualValue = result.get(0).getValue();
matches = Bytes.equals(expectedValue, actualValue);
@ -1542,6 +1599,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* Add updates first to the hlog and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param family
* @param edits Cell updates by column
* @praram now
* @throws IOException
@ -1561,12 +1619,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @throws IOException
*/
private void put(final Map<byte [], List<KeyValue>> familyMap,
boolean writeToWAL)
throws IOException {
boolean writeToWAL) throws IOException {
long now = System.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
this.updatesLock.readLock().lock();
ReadWriteConsistencyControl.WriteEntry w = null;
try {
WALEdit walEdit = new WALEdit();
@ -1605,8 +1663,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
}
long size = 0;
w = rwcc.beginMemstoreInsert();
// now make changes to the memstore
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
@ -1614,11 +1675,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Store store = getStore(family);
for (KeyValue kv: edits) {
kv.setMemstoreTS(w.getWriteNumber());
size = this.memstoreSize.addAndGet(store.add(kv));
}
}
flush = isFlushSize(size);
} finally {
if (w != null) rwcc.completeMemstoreInsert(w);
this.updatesLock.readLock().unlock();
}
if (flush) {
@ -1862,11 +1926,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private final byte [] stopRow;
private Filter filter;
private List<KeyValue> results = new ArrayList<KeyValue>();
private int isScan;
private int batch;
// Doesn't need to be volatile, always accessed under a sync'ed method
private boolean filterClosed = false;
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
//DebugPrint.println("HRegionScanner.<init>, threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint());
this.filter = scan.getFilter();
this.batch = scan.getBatch();
@ -1875,7 +1943,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
} else {
this.stopRow = scan.getStopRow();
}
this.isScan = scan.isGetScan() ? -1 : 0;
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
@ -1897,6 +1966,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (filter != null) {
filter.reset();
}
// Start the next row read and reset the thread point
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
}
public synchronized boolean next(List<KeyValue> outResults, int limit)
@ -1911,6 +1983,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing=" + closing.get() + " or closed=" + closed.get());
}
// This could be a new thread from the last time we called next().
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
results.clear();
boolean returnResult = nextInternal(limit);
if (!returnResult && filter != null && filter.filterRow()) {
@ -2494,10 +2569,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// HBASE-880
//
/**
* @param get
* @param lockid
* @param get get object
* @param lockid existing lock id, or null for no previous lock
* @return result
* @throws IOException
* @throws IOException read exceptions
*/
public Result get(final Get get, final Integer lockid) throws IOException {
// Verify families are all valid
@ -2510,24 +2585,28 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
get.addFamily(family);
}
}
// Lock row
Integer lid = getLock(lockid, get.getRow());
List<KeyValue> result = new ArrayList<KeyValue>();
try {
for (Map.Entry<byte[],NavigableSet<byte[]>> entry:
get.getFamilyMap().entrySet()) {
get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
}
} finally {
if(lockid == null) releaseRowLock(lid);
}
List<KeyValue> result = get(get);
return new Result(result);
}
private void get(final Store store, final Get get,
final NavigableSet<byte []> qualifiers, List<KeyValue> result)
throws IOException {
store.get(get, qualifiers, result);
/*
* Do a get based on the get parameter.
*/
private List<KeyValue> get(final Get get) throws IOException {
Scan scan = new Scan(get);
List<KeyValue> results = new ArrayList<KeyValue>();
InternalScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}
return results;
}
/**
@ -2536,6 +2615,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @param family
* @param qualifier
* @param amount
* @param writeToWAL
* @return The new value.
* @throws IOException
*/
@ -2550,6 +2630,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
try {
Store store = stores.get(family);
// TODO call the proper GET API
// Get the old value:
Get get = new Get(row);
get.addColumn(family, qualifier);
@ -2614,8 +2695,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
(5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
(20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
(21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER +

View File

@ -167,6 +167,7 @@ class KeyValueSkipListSet implements NavigableSet<KeyValue> {
}
public boolean contains(Object o) {
//noinspection SuspiciousMethodCalls
return this.delegatee.containsKey(o);
}

View File

@ -20,15 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
@ -43,6 +34,15 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
* The MemStore holds in-memory modifications to the Store. Modifications
* are {@link KeyValue}s. When asked to flush, current memstore is moved
@ -80,10 +80,6 @@ public class MemStore implements HeapSize {
// Used to track own heapSize
final AtomicLong size;
// All access must be synchronized.
final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
new CopyOnWriteArraySet<ChangedMemStoreObserver>();
/**
* Default constructor. Used for tests.
*/
@ -131,7 +127,6 @@ public class MemStore implements HeapSize {
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
tellChangedMemStoreObservers();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
}
@ -141,15 +136,6 @@ public class MemStore implements HeapSize {
}
}
/*
* Tell outstanding scanners that memstore has changed.
*/
private void tellChangedMemStoreObservers() {
for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
o.changedMemStore();
}
}
/**
* Return the current snapshot.
* Called by flusher to get current snapshot made by a previous
@ -180,7 +166,6 @@ public class MemStore implements HeapSize {
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
tellChangedMemStoreObservers();
}
} finally {
this.lock.writeLock().unlock();
@ -212,69 +197,8 @@ public class MemStore implements HeapSize {
long delete(final KeyValue delete) {
long s = 0;
this.lock.readLock().lock();
//Have to find out what we want to do here, to find the fastest way of
//removing things that are under a delete.
//Actions that will take place here are:
//1. Insert a delete and remove all the affected entries already in memstore
//2. In the case of a Delete and the matching put is found then don't insert
// the delete
//TODO Would be nice with if we had an iterator for this, so we could remove
//things that needs to be removed while iterating and don't have to go
//back and do it afterwards
try {
boolean notpresent = false;
List<KeyValue> deletes = new ArrayList<KeyValue>();
SortedSet<KeyValue> tail = this.kvset.tailSet(delete);
//Parse the delete, so that it is only done once
byte [] deleteBuffer = delete.getBuffer();
int deleteOffset = delete.getOffset();
int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
deleteOffset += Bytes.SIZEOF_SHORT;
int deleteRowOffset = deleteOffset;
deleteOffset += deleteRowLen;
byte deleteFamLen = deleteBuffer[deleteOffset];
deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
int deleteQualifierOffset = deleteOffset;
int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
Bytes.SIZEOF_BYTE;
deleteOffset += deleteQualifierLen;
int deleteTimestampOffset = deleteOffset;
deleteOffset += Bytes.SIZEOF_LONG;
byte deleteType = deleteBuffer[deleteOffset];
//Comparing with tail from memstore
for (KeyValue kv : tail) {
DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
deleteRowOffset, deleteRowLen, deleteQualifierOffset,
deleteQualifierLen, deleteTimestampOffset, deleteType,
comparator.getRawComparator());
if (res == DeleteCode.DONE) {
break;
} else if (res == DeleteCode.DELETE) {
deletes.add(kv);
} // SKIP
}
//Delete all the entries effected by the last added delete
for (KeyValue kv : deletes) {
notpresent = this.kvset.remove(kv);
s -= heapSizeChange(kv, notpresent);
}
// Adding the delete to memstore. Add any value, as long as
// same instance each time.
s += heapSizeChange(delete, this.kvset.add(delete));
} finally {
this.lock.readLock().unlock();
@ -335,7 +259,7 @@ public class MemStore implements HeapSize {
}
/**
* @param state
* @param state column/delete tracking state
*/
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
this.lock.readLock().lock();
@ -459,7 +383,7 @@ public class MemStore implements HeapSize {
this.lock.readLock().lock();
try {
KeyValueScanner [] scanners = new KeyValueScanner[1];
scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
scanners[0] = new MemStoreScanner();
return scanners;
} finally {
this.lock.readLock().unlock();
@ -481,10 +405,8 @@ public class MemStore implements HeapSize {
* @param matcher Column matcher
* @param result List to add results to
* @return true if done with store (early-out), false if not
* @throws IOException
*/
public boolean get(QueryMatcher matcher, List<KeyValue> result)
throws IOException {
public boolean get(QueryMatcher matcher, List<KeyValue> result) {
this.lock.readLock().lock();
try {
if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
@ -501,11 +423,11 @@ public class MemStore implements HeapSize {
* Gets from either the memstore or the snapshop, and returns a code
* to let you know which is which.
*
* @param matcher
* @param result
* @param matcher query matcher
* @param result puts results here
* @return 1 == memstore, 2 == snapshot, 0 == none
*/
int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
this.lock.readLock().lock();
try {
boolean fromMemstore = internalGet(this.kvset, matcher, result);
@ -540,11 +462,9 @@ public class MemStore implements HeapSize {
* @param matcher query matcher
* @param result list to add results to
* @return true if done with store (early-out), false if not
* @throws IOException
*/
boolean internalGet(final NavigableSet<KeyValue> set,
final QueryMatcher matcher, final List<KeyValue> result)
throws IOException {
final QueryMatcher matcher, final List<KeyValue> result) {
if(set.isEmpty()) return false;
// Seek to startKey
SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
@ -574,163 +494,144 @@ public class MemStore implements HeapSize {
* map and snapshot.
* This behaves as if it were a real scanner but does not maintain position.
*/
protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0;
// Make access atomic.
private FirstOnRow firstOnNextRow = new FirstOnRow();
// Keep reference to Set so can remove myself when closed.
private final Set<ChangedMemStoreObserver> observers;
protected class MemStoreScanner implements KeyValueScanner {
// Next row information for either kvset or snapshot
private KeyValue kvsetNextRow = null;
private KeyValue snapshotNextRow = null;
MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
// iterator based scanning.
Iterator<KeyValue> kvsetIt;
Iterator<KeyValue> snapshotIt;
/*
Some notes...
So memstorescanner is fixed at creation time. this includes pointers/iterators into
existing kvset/snapshot. during a snapshot creation, the kvset is null, and the
snapshot is moved. since kvset is null there is no point on reseeking on both,
we can save us the trouble. During the snapshot->hfile transition, the memstore
scanner is re-created by StoreScanner#updateReaders(). StoreScanner should
potentially do something smarter by adjusting the existing memstore scanner.
But there is a greater problem here, that being once a scanner has progressed
during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
if a scan lasts a little while, there is a chance for new entries in kvset to
become available but we will never see them. This needs to be handled at the
StoreScanner level with coordination with MemStoreScanner.
*/
MemStoreScanner() {
super();
this.observers = observers;
this.observers.add(this);
//DebugPrint.println(" MS new@" + hashCode());
}
public boolean seek(KeyValue key) {
try {
if (key == null) {
close();
return false;
protected KeyValue getNext(Iterator<KeyValue> it) {
KeyValue ret = null;
long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
//DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
while (ret == null && it.hasNext()) {
KeyValue v = it.next();
if (v.getMemstoreTS() <= readPoint) {
// keep it.
ret = v;
}
this.firstOnNextRow.set(key);
return cacheNextRow();
} catch(Exception e) {
}
return ret;
}
public synchronized boolean seek(KeyValue key) {
if (key == null) {
close();
return false;
}
// kvset and snapshot will never be empty.
// if tailSet cant find anything, SS is empty (not null).
SortedSet<KeyValue> kvTail = kvset.tailSet(key);
SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
kvsetIt = kvTail.iterator();
snapshotIt = snapshotTail.iterator();
kvsetNextRow = getNext(kvsetIt);
snapshotNextRow = getNext(snapshotIt);
//long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
//DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
// kvset.size() + " threadread = " + readPoint);
//DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
// snapshot.size() + " threadread = " + readPoint);
KeyValue lowest = getLowest();
// has data := (lowest != null)
return lowest != null;
}
public KeyValue peek() {
if (idx >= this.result.size()) {
if (!cacheNextRow()) {
public synchronized KeyValue peek() {
//DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
return getLowest();
}
public synchronized KeyValue next() {
KeyValue theNext = getLowest();
if (theNext == null) {
return null;
}
return peek();
}
return result.get(idx);
// Advance one of the iterators
if (theNext == kvsetNextRow) {
kvsetNextRow = getNext(kvsetIt);
} else {
snapshotNextRow = getNext(snapshotIt);
}
//long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
//DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
// getLowest() + " threadpoint=" + readpoint);
return theNext;
}
public KeyValue next() {
if (idx >= result.size()) {
if (!cacheNextRow()) {
return null;
}
return next();
}
return this.result.get(idx++);
}
/**
* @return True if successfully cached a next row.
*/
boolean cacheNextRow() {
// Prevent snapshot being cleared while caching a row.
lock.readLock().lock();
try {
this.result.clear();
this.idx = 0;
// Look at each set, kvset and snapshot.
// Both look for matching entries for this.current row returning what
// they
// have as next row after this.current (or null if nothing in set or if
// nothing follows.
KeyValue kvsetNextRow = cacheNextRow(kvset);
KeyValue snapshotNextRow = cacheNextRow(snapshot);
if (kvsetNextRow == null && snapshotNextRow == null) {
// Nothing more in memstore but we might have gotten current row
// results
// Indicate at end of store by setting next row to null.
this.firstOnNextRow.set(null);
return !this.result.isEmpty();
} else if (kvsetNextRow != null && snapshotNextRow != null) {
// Set current at the lowest of the two values.
int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
} else {
this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
}
return true;
} finally {
lock.readLock().unlock();
}
protected KeyValue getLowest() {
return getLower(kvsetNextRow,
snapshotNextRow);
}
/*
* See if set has entries for the <code>this.current</code> row. If so,
* add them to <code>this.result</code>.
* @param set Set to examine
* @return Next row in passed <code>set</code> or null if nothing in this
* passed <code>set</code>
* Returns the lower of the two key values, or null if they are both null.
* This uses comparator.compare() to compare the KeyValue using the memstore
* comparator.
*/
private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
if (tail == null || tail.isEmpty()) return null;
KeyValue first = tail.first();
KeyValue nextRow = null;
for (KeyValue kv: tail) {
if (comparator.compareRows(first, kv) != 0) {
nextRow = kv;
break;
}
this.result.add(kv);
protected KeyValue getLower(KeyValue first, KeyValue second) {
if (first == null && second == null) {
return null;
}
return nextRow;
}
public void close() {
this.firstOnNextRow.set(null);
idx = 0;
if (!result.isEmpty()) {
result.clear();
if (first != null && second != null) {
int compare = comparator.compare(first, second);
return (compare <= 0 ? first : second);
}
this.observers.remove(this);
return (first != null ? first : second);
}
public void changedMemStore() {
this.firstOnNextRow.reset();
}
}
public synchronized void close() {
this.kvsetNextRow = null;
this.snapshotNextRow = null;
/*
* Private class that holds firstOnRow and utility.
* Usually firstOnRow is the first KeyValue we find on next row rather than
* the absolute minimal first key (empty column, Type.Maximum, maximum ts).
* Usually its ok being sloppy with firstOnRow letting it be the first thing
* found on next row -- this works -- but if the memstore changes on us, reset
* firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow
* usually so we don't have to allocate a new KeyValue each time firstOnRow
* is updated.
*/
private static class FirstOnRow {
private KeyValue firstOnRow = null;
FirstOnRow() {
super();
}
synchronized void set(final KeyValue kv) {
this.firstOnRow = kv;
}
/* Reset firstOnRow to a 'clean', absolute firstOnRow.
*/
synchronized void reset() {
if (this.firstOnRow == null) return;
this.firstOnRow =
new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
}
synchronized KeyValue get() {
return this.firstOnRow;
this.kvsetIt = null;
this.snapshotIt = null;
}
}
public final static long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@ -770,7 +671,7 @@ public class MemStore implements HeapSize {
* enough. See hbase-900. Fills memstores then waits so user can heap
* dump and bring up resultant hprof in something like jprofiler which
* allows you get 'deep size' on objects.
* @param args
* @param args main args
*/
public static void main(String [] args) {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
@ -807,16 +708,4 @@ public class MemStore implements HeapSize {
}
LOG.info("Exiting.");
}
/**
* Observers want to know about MemStore changes.
* Called when snapshot is cleared and when we make one.
*/
interface ChangedMemStoreObserver {
/**
* Notify observers.
* @throws IOException
*/
void changedMemStore();
}
}

View File

@ -0,0 +1,106 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
/**
* Manages the read/write consistency within memstore. This provides
* an interface for readers to determine what entries to ignore, and
* a mechanism for writers to obtain new write numbers, then "commit"
* the new writes for readers to read (thus forming atomic transactions).
*/
public class ReadWriteConsistencyControl {
private final AtomicLong memstoreRead = new AtomicLong();
private final AtomicLong memstoreWrite = new AtomicLong();
// This is the pending queue of writes.
private final LinkedList<WriteEntry> writeQueue =
new LinkedList<WriteEntry>();
private static final ThreadLocal<Long> perThreadReadPoint =
new ThreadLocal<Long>();
public static long getThreadReadPoint() {
return perThreadReadPoint.get();
}
public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
perThreadReadPoint.set(rwcc.memstoreReadPoint());
return getThreadReadPoint();
}
public WriteEntry beginMemstoreInsert() {
synchronized (writeQueue) {
long nextWriteNumber = memstoreWrite.incrementAndGet();
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
return e;
}
}
public void completeMemstoreInsert(WriteEntry e) {
synchronized (writeQueue) {
e.markCompleted();
long nextReadValue = -1;
boolean ranOnce=false;
while (!writeQueue.isEmpty()) {
ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue+1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ nextReadValue + " next: " + queueFirst.getWriteNumber());
}
}
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
if (!ranOnce) {
throw new RuntimeException("never was a first");
}
if (nextReadValue > 0) {
memstoreRead.set(nextReadValue);
}
}
// Spin until any other concurrent puts have finished. This makes sure that
// if we move on to construct a scanner, we'll get read-your-own-writes
// consistency. We anticipate that since puts to the memstore are very fast,
// this will be on the order of microseconds - so spinning should be faster
// than a condition variable.
int spun = 0;
while (memstoreRead.get() < e.getWriteNumber()) {
spun++;
}
// Could potentially expose spun as a metric
}
public long memstoreReadPoint() {
return memstoreRead.get();
}
public static class WriteEntry {
private long writeNumber;
private boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
void markCompleted() {
this.completed = true;
}
boolean isCompleted() {
return this.completed;
}
long getWriteNumber() {
return this.writeNumber;
}
}
}

View File

@ -34,6 +34,7 @@ public class ScanQueryMatcher extends QueryMatcher {
// Optimization so we can skip lots of compares when we decide to skip
// to the next row.
private boolean stickyNextRow;
private KeyValue stopKey = null;
/**
* Constructs a QueryMatcher for a Scan.
@ -51,6 +52,11 @@ public class ScanQueryMatcher extends QueryMatcher {
this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
if (scan.isGetScan()) {
this.stopKey = KeyValue.createLastOnRow(scan.getStopRow());
} else {
this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
}
this.filter = scan.getFilter();
// Single branch to deal with two types of reads (columns vs all in family)

View File

@ -609,9 +609,12 @@ public class Store implements HConstants, HeapSize {
this.lock.writeLock().lock();
try {
this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
this.memstore.clearSnapshot(set);
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
this.memstore.clearSnapshot(set);
return this.storefiles.size() >= this.compactionThreshold;
} finally {
this.lock.writeLock().unlock();
@ -639,10 +642,8 @@ public class Store implements HConstants, HeapSize {
* @param o Observer no longer interested in changes in set of Readers.
*/
void deleteChangedReaderObserver(ChangedReadersObserver o) {
if(this.changedReaderObservers.size() > 0) {
if (!this.changedReaderObservers.remove(o)) {
LOG.warn("Not in set" + o);
}
if (!this.changedReaderObservers.remove(o)) {
LOG.warn("Not in set" + o);
}
}
@ -993,6 +994,10 @@ public class Store implements HConstants, HeapSize {
Long orderVal = Long.valueOf(result.getMaxSequenceId());
this.storefiles.put(orderVal, result);
}
// WARN ugly hack here, but necessary sadly.
ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
// Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
@ -1449,7 +1454,12 @@ public class Store implements HConstants, HeapSize {
}
/**
* Increments the value for the given row/family/qualifier
* Increments the value for the given row/family/qualifier.
*
* This function will always be seen as atomic by other readers
* because it only puts a single KV to memstore. Thus no
* read/write control necessary.
*
* @param row
* @param f
* @param qualifier

View File

@ -32,7 +32,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Scanner scans both the memstore and the HStore. Coaleace KeyValue stream
@ -46,10 +45,15 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
private boolean cacheBlocks;
// Used to indicate that the scanner has closed (see HBASE-1107)
private final AtomicBoolean closing = new AtomicBoolean(false);
private boolean closing = false;
private final boolean isGet;
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles.
*
* @param store who we scan
* @param scan the spec
* @param columns which columns we are scanning
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
this.store = store;
@ -58,9 +62,11 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
this.isGet = scan.isGetScan();
List<KeyValueScanner> scanners = getScanners();
// Seek all scanners to the initial key
// TODO if scan.isGetScan, use bloomfilters to skip seeking
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
@ -76,10 +82,14 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
* Used for major compactions.<p>
*
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancilliary scanners
*/
StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) {
this.store = store;
this.cacheBlocks = false;
this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));
@ -99,6 +109,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
final NavigableSet<byte[]> columns,
final KeyValueScanner [] scanners) {
this.store = null;
this.isGet = false;
this.cacheBlocks = scan.getCacheBlocks();
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), scan.getMaxVersions());
@ -132,8 +143,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
}
public synchronized void close() {
boolean state = this.closing.getAndSet(true);
if (state) return;
if (this.closing) return;
this.closing = true;
// under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
@ -146,11 +157,12 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
/**
* Get the next row of values from this Store.
* @param result
* @param outResult
* @param limit
* @return true if there are more rows, false if scanner is done
*/
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
//DebugPrint.println("SS.next");
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@ -161,6 +173,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
List<KeyValue> results = new ArrayList<KeyValue>();
LOOP: while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode qcode = matcher.match(kv);
//DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
switch(qcode) {
case INCLUDE:
KeyValue next = this.heap.next();
@ -228,8 +241,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
LOG.warn("StoreFile " + sf + " has null Reader");
continue;
}
// Get a scanner that does not use pread.
s.add(r.getScanner(this.cacheBlocks, false));
// If isGet, use pread, else false, dont use pread
s.add(r.getScanner(this.cacheBlocks, isGet));
}
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(s.size()+1);
@ -241,12 +254,16 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
// Implementation of ChangedReadersObserver
public synchronized void updateReaders() throws IOException {
if (this.closing.get()) return;
if (this.closing) return;
KeyValue topKey = this.peek();
if (topKey == null) return;
List<KeyValueScanner> scanners = getScanners();
// Seek all scanners to the initial key
// close the previous scanners:
this.heap.close(); // bubble thru and close all scanners.
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
for(KeyValueScanner scanner : scanners) {
scanner.seek(topKey);
}

View File

@ -365,6 +365,7 @@ public class FSUtils {
return true;
}
// TODO move this method OUT of FSUtils. No dependencies to HMaster
/**
* Returns the total overall fragmentation percentage. Includes .META. and
* -ROOT- as well.

View File

@ -277,4 +277,49 @@ public class TestKeyValue extends TestCase {
// TODO actually write this test!
}
private final byte[] rowA = Bytes.toBytes("rowA");
private final byte[] rowB = Bytes.toBytes("rowB");
private final byte[] family = Bytes.toBytes("family");
private final byte[] qualA = Bytes.toBytes("qfA");
private void assertKVLess(KeyValue.KVComparator c,
KeyValue less,
KeyValue greater) {
int cmp = c.compare(less,greater);
assertTrue(cmp < 0);
cmp = c.compare(greater,less);
assertTrue(cmp > 0);
}
public void testFirstLastOnRow() {
final KVComparator c = KeyValue.COMPARATOR;
long ts = 1;
// These are listed in sort order (ie: every one should be less
// than the one on the next line).
final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA);
final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put);
final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put);
final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA);
final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB);
final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put);
assertKVLess(c, firstOnRowA, firstOnRowB);
assertKVLess(c, firstOnRowA, kvA_1);
assertKVLess(c, firstOnRowA, kvA_2);
assertKVLess(c, kvA_1, kvA_2);
assertKVLess(c, kvA_2, firstOnRowB);
assertKVLess(c, kvA_1, firstOnRowB);
assertKVLess(c, lastOnRowA, firstOnRowB);
assertKVLess(c, firstOnRowB, kvB);
assertKVLess(c, lastOnRowA, kvB);
assertKVLess(c, kvA_2, lastOnRowA);
assertKVLess(c, kvA_1, lastOnRowA);
assertKVLess(c, firstOnRowA, lastOnRowA);
}
}

View File

@ -308,6 +308,32 @@ public class TestFromClientSide {
CompareFilter.CompareOp.GREATER_OR_EQUAL));
assertEquals(rowCount - endKeyCount, countGreater);
}
/*
* Load table with rows from 'aaa' to 'zzz'.
* @param t
* @return Count of rows loaded.
* @throws IOException
*/
private int loadTable(final HTable t) throws IOException {
// Add data to table.
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 < 'z'; b1++) {
for (byte b2 = 'a'; b2 < 'z'; b2++) {
for (byte b3 = 'a'; b3 < 'z'; b3++) {
k[0] = b1;
k[1] = b2;
k[2] = b3;
Put put = new Put(k);
put.add(FAMILY, new byte[0], k);
t.put(put);
rowCount++;
}
}
}
return rowCount;
}
/*
* @param key
@ -1452,7 +1478,7 @@ public class TestFromClientSide {
ht.put(put);
delete = new Delete(ROW);
delete.deleteColumn(FAMILIES[0], QUALIFIER);
delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4]
ht.delete(delete);
get = new Get(ROW);
@ -1487,23 +1513,24 @@ public class TestFromClientSide {
// But alas, this is not to be. We can't put them back in either case.
put = new Put(ROW);
put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000
put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000
ht.put(put);
// The Get returns the latest value but then does not return the
// oldest, which was never deleted, ts[1].
// It used to be due to the internal implementation of Get, that
// the Get() call would return ts[4] UNLIKE the Scan below. With
// the switch to using Scan for Get this is no longer the case.
get = new Get(ROW);
get.addFamily(FAMILIES[0]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
new long [] {ts[2], ts[3], ts[4]},
new byte[][] {VALUES[2], VALUES[3], VALUES[4]},
assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
new long [] {ts[1], ts[2], ts[3]},
new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
0, 2);
// The Scanner returns the previous values, the expected-unexpected behavior
// The Scanner returns the previous values, the expected-naive-unexpected behavior
scan = new Scan(ROW);
scan.addFamily(FAMILIES[0]);
@ -1537,6 +1564,15 @@ public class TestFromClientSide {
put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
ht.put(put);
// Assert that above went in.
get = new Get(ROWS[2]);
get.addFamily(FAMILIES[1]);
get.addFamily(FAMILIES[2]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
assertTrue("Expected 4 key but received " + result.size() + ": " + result,
result.size() == 4);
delete = new Delete(ROWS[0]);
delete.deleteFamily(FAMILIES[2]);
ht.delete(delete);
@ -1596,8 +1632,7 @@ public class TestFromClientSide {
get.addFamily(FAMILIES[2]);
get.setMaxVersions(Integer.MAX_VALUE);
result = ht.get(get);
assertTrue("Expected 1 key but received " + result.size(),
result.size() == 1);
assertEquals(1, result.size());
assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
new byte[][] {VALUES[2]},
@ -1608,8 +1643,7 @@ public class TestFromClientSide {
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertTrue("Expected 1 key but received " + result.size(),
result.size() == 1);
assertEquals(1, result.size());
assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long [] {ts[2]},
new byte[][] {VALUES[2]},
@ -1696,7 +1730,7 @@ public class TestFromClientSide {
}
}
/**
/*
* Baseline "scalability" test.
*
* Tests one hundred families, one million columns, one million versions
@ -2661,12 +2695,11 @@ public class TestFromClientSide {
"Got row [" + Bytes.toString(result.getRow()) +"]",
equals(row, result.getRow()));
int expectedResults = end - start + 1;
assertTrue("Expected " + expectedResults + " keys but result contains "
+ result.size(), result.size() == expectedResults);
assertEquals(expectedResults, result.size());
KeyValue [] keys = result.sorted();
for(int i=0;i<keys.length;i++) {
for (int i=0; i<keys.length; i++) {
byte [] value = values[end-i];
long ts = stamps[end-i];
KeyValue key = keys[i];
@ -2824,9 +2857,9 @@ public class TestFromClientSide {
}
private boolean equals(byte [] left, byte [] right) {
if(left == null && right == null) return true;
if(left == null && right.length == 0) return true;
if(right == null && left.length == 0) return true;
if (left == null && right == null) return true;
if (left == null && right.length == 0) return true;
if (right == null && left.length == 0) return true;
return Bytes.equals(left, right);
}

View File

@ -255,8 +255,8 @@ public class TestGetDeleteTracker extends HBaseTestCase implements HConstants {
}
//update()
dt.update();
assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts3));
assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts1));
assertFalse(dt.isDeleted(col2, 0, col2Len, ts3));
assertFalse(dt.isDeleted(col2, 0, col2Len, ts1));
}
public void testIsDeleted_Delete(){
//Building lists

View File

@ -93,7 +93,7 @@ public class TestHRegion extends HBaseTestCase {
//////////////////////////////////////////////////////////////////////////////
/**
/*
* An involved filter test. Has multiple column families and deletes in mix.
*/
public void testWeirdCacheBehaviour() throws Exception {
@ -361,6 +361,34 @@ public class TestHRegion extends HBaseTestCase {
//////////////////////////////////////////////////////////////////////////////
// Delete tests
//////////////////////////////////////////////////////////////////////////////
public void testDelete_multiDeleteColumn() throws IOException {
byte [] tableName = Bytes.toBytes("testtable");
byte [] row1 = Bytes.toBytes("row1");
byte [] fam1 = Bytes.toBytes("fam1");
byte [] qual = Bytes.toBytes("qualifier");
byte [] value = Bytes.toBytes("value");
Put put = new Put(row1);
put.add(fam1, qual, 1, value);
put.add(fam1, qual, 2, value);
String method = this.getName();
initHRegion(tableName, method, fam1);
region.put(put);
// We do support deleting more than 1 'latest' version
Delete delete = new Delete(row1);
delete.deleteColumn(fam1, qual);
delete.deleteColumn(fam1, qual);
region.delete(delete, null, false);
Get get = new Get(row1);
get.addFamily(fam1);
Result r = region.get(get, null);
assertEquals(0, r.size());
}
public void testDelete_CheckFamily() throws IOException {
byte [] tableName = Bytes.toBytes("testtable");
byte [] row1 = Bytes.toBytes("row1");
@ -369,11 +397,9 @@ public class TestHRegion extends HBaseTestCase {
byte [] fam3 = Bytes.toBytes("fam3");
byte [] fam4 = Bytes.toBytes("fam4");
byte[][] families = {fam1, fam2, fam3};
//Setting up region
String method = this.getName();
initHRegion(tableName, method, families);
initHRegion(tableName, method, fam1, fam2, fam3);
List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(row1, fam4, null, null));
@ -1455,6 +1481,41 @@ public class TestHRegion extends HBaseTestCase {
assertICV(row, fam1, qual1, value+amount);
}
public void testIncrementColumnValue_BumpSnapshot() throws IOException {
initHRegion(tableName, getName(), fam1);
long value = 42L;
long incr = 44L;
// first put something in kvset, then snapshot it.
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
region.put(put);
// get the store in question:
Store s = region.getStore(fam1);
s.snapshot(); //bam
// now increment:
long newVal = region.incrementColumnValue(row, fam1, qual1,
incr, false);
assertEquals(value+incr, newVal);
// get both versions:
Get get = new Get(row);
get.setMaxVersions();
get.addColumn(fam1,qual1);
Result r = region.get(get, null);
assertEquals(2, r.size());
KeyValue first = r.raw()[0];
KeyValue second = r.raw()[1];
assertTrue("ICV failed to upgrade timestamp",
first.getTimestamp() != second.getTimestamp());
}
public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
initHRegion(tableName, getName(), fam1);
@ -1952,9 +2013,9 @@ public class TestHRegion extends HBaseTestCase {
FlushThread flushThread = new FlushThread();
flushThread.start();
Scan scan = new Scan();
scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("row0"))));
Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
// scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
// new BinaryComparator(Bytes.toBytes("row0"))));
int expectedCount = numFamilies * numQualifiers;
List<KeyValue> res = new ArrayList<KeyValue>();
@ -1967,7 +2028,7 @@ public class TestHRegion extends HBaseTestCase {
}
if (i != 0 && i % flushInterval == 0) {
//System.out.println("scan iteration = " + i);
//System.out.println("flush scan iteration = " + i);
flushThread.flush();
}
@ -1976,14 +2037,18 @@ public class TestHRegion extends HBaseTestCase {
InternalScanner scanner = region.getScanner(scan);
while (scanner.next(res)) ;
if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
Assert.assertEquals("i=" + i, expectedCount, res.size());
assertEquals("i=" + i, expectedCount, res.size());
long timestamp = res.get(0).getTimestamp();
Assert.assertTrue(timestamp >= prevTimestamp);
assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
timestamp >= prevTimestamp);
prevTimestamp = timestamp;
}
}
putThread.done();
region.flushcache();
putThread.join();
putThread.checkNoError();
@ -2028,15 +2093,16 @@ public class TestHRegion extends HBaseTestCase {
for (int r = 0; r < numRows; r++) {
byte[] row = Bytes.toBytes("row" + r);
Put put = new Put(row);
for (int f = 0; f < families.length; f++) {
for (int q = 0; q < qualifiers.length; q++) {
put.add(families[f], qualifiers[q], (long) val,
Bytes.toBytes(val));
for (byte[] family : families) {
for (byte[] qualifier : qualifiers) {
put.add(family, qualifier, (long) val,
Bytes.toBytes(val));
}
}
// System.out.println("Putting of kvsetsize=" + put.size());
region.put(put);
if (val > 0 && val % 47 == 0){
//System.out.println("put iteration = " + val);
if (val > 0 && val % 47 == 0) {
System.out.println("put iteration = " + val);
Delete delete = new Delete(row, (long)val-30, null);
region.delete(delete, null, true);
}
@ -2123,6 +2189,9 @@ public class TestHRegion extends HBaseTestCase {
}
putThread.done();
region.flushcache();
putThread.join();
putThread.checkNoError();

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@ -47,10 +48,12 @@ public class TestMemStore extends TestCase {
private static final byte [] CONTENTS = Bytes.toBytes("contents");
private static final byte [] BASIC = Bytes.toBytes("basic");
private static final String CONTENTSTR = "contentstr";
private ReadWriteConsistencyControl rwcc;
@Override
public void setUp() throws Exception {
super.setUp();
this.rwcc = new ReadWriteConsistencyControl();
this.memstore = new MemStore();
}
@ -76,6 +79,7 @@ public class TestMemStore extends TestCase {
KeyValueScanner [] memstorescanners = this.memstore.getScanners();
Scan scan = new Scan();
List<KeyValue> result = new ArrayList<KeyValue>();
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
int count = 0;
@ -94,6 +98,8 @@ public class TestMemStore extends TestCase {
for (int i = 0; i < memstorescanners.length; i++) {
memstorescanners[0].close();
}
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@ -138,9 +144,9 @@ public class TestMemStore extends TestCase {
if (count == snapshotIndex) {
this.memstore.snapshot();
this.memstore.clearSnapshot(this.memstore.getSnapshot());
// Added more rows into kvset.
// Added more rows into kvset. But the scanner wont see these rows.
addRows(this.memstore, ts);
LOG.info("Snapshotted, cleared it and then added values");
LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
}
result.clear();
}
@ -151,6 +157,181 @@ public class TestMemStore extends TestCase {
}
/**
* A simple test which verifies the 3 possible states when scanning across snapshot.
*/
public void testScanAcrossSnapshot2() {
// we are going to the scanning across snapshot with two kvs
// kv1 should always be returned before kv2
final byte[] one = Bytes.toBytes(1);
final byte[] two = Bytes.toBytes(2);
final byte[] f = Bytes.toBytes("f");
final byte[] q = Bytes.toBytes("q");
final byte[] v = Bytes.toBytes(3);
final KeyValue kv1 = new KeyValue(one, f, q, v);
final KeyValue kv2 = new KeyValue(two, f, q, v);
// use case 1: both kvs in kvset
this.memstore.add(kv1.clone());
this.memstore.add(kv2.clone());
verifyScanAcrossSnapshot2(kv1, kv2);
// use case 2: both kvs in snapshot
this.memstore.snapshot();
verifyScanAcrossSnapshot2(kv1, kv2);
// use case 3: first in snapshot second in kvset
this.memstore = new MemStore();
this.memstore.add(kv1.clone());
this.memstore.snapshot();
this.memstore.add(kv2.clone());
verifyScanAcrossSnapshot2(kv1, kv2);
}
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) {
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
KeyValueScanner[] memstorescanners = this.memstore.getScanners();
assertEquals(1, memstorescanners.length);
final KeyValueScanner scanner = memstorescanners[0];
scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW));
assertEquals(kv1, scanner.next());
assertEquals(kv2, scanner.next());
assertNull(scanner.next());
}
private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) {
scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
for (KeyValue kv : expected) {
assertTrue(0 ==
KeyValue.COMPARATOR.compare(kv,
scanner.next()));
}
assertNull(scanner.peek());
}
public void testMemstoreConcurrentControl() {
final byte[] row = Bytes.toBytes(1);
final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] v = Bytes.toBytes("value");
ReadWriteConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert();
KeyValue kv1 = new KeyValue(row, f, q1, v);
kv1.setMemstoreTS(w.getWriteNumber());
memstore.add(kv1);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
KeyValueScanner[] s = this.memstore.getScanners();
assertScannerResults(s[0], new KeyValue[]{});
rwcc.completeMemstoreInsert(w);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners();
assertScannerResults(s[0], new KeyValue[]{kv1});
w = rwcc.beginMemstoreInsert();
KeyValue kv2 = new KeyValue(row, f, q2, v);
kv2.setMemstoreTS(w.getWriteNumber());
memstore.add(kv2);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners();
assertScannerResults(s[0], new KeyValue[]{kv1});
rwcc.completeMemstoreInsert(w);
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
s = this.memstore.getScanners();
assertScannerResults(s[0], new KeyValue[]{kv1, kv2});
}
private static class ReadOwnWritesTester extends Thread {
final int id;
static final int NUM_TRIES = 1000;
final byte[] row;
final byte[] f = Bytes.toBytes("family");
final byte[] q1 = Bytes.toBytes("q1");
final ReadWriteConsistencyControl rwcc;
final MemStore memstore;
AtomicReference<Throwable> caughtException;
public ReadOwnWritesTester(int id,
MemStore memstore,
ReadWriteConsistencyControl rwcc,
AtomicReference<Throwable> caughtException)
{
this.id = id;
this.rwcc = rwcc;
this.memstore = memstore;
this.caughtException = caughtException;
row = Bytes.toBytes(id);
}
public void run() {
try {
internalRun();
} catch (Throwable t) {
caughtException.compareAndSet(null, t);
}
}
private void internalRun() {
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
ReadWriteConsistencyControl.WriteEntry w =
rwcc.beginMemstoreInsert();
// Insert the sequence value (i)
byte[] v = Bytes.toBytes(i);
KeyValue kv = new KeyValue(row, f, q1, i, v);
kv.setMemstoreTS(w.getWriteNumber());
memstore.add(kv);
rwcc.completeMemstoreInsert(w);
// Assert that we can read back
KeyValueScanner s = this.memstore.getScanners()[0];
s.seek(kv);
KeyValue ret = s.next();
assertNotNull("Didnt find own write at all", ret);
assertEquals("Didnt read own writes",
kv.getTimestamp(), ret.getTimestamp());
}
}
}
public void no_testReadOwnWritesUnderConcurrency() throws Throwable {
int NUM_THREADS = 8;
ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS];
AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught);
threads[i].start();
}
for (int i = 0; i < NUM_THREADS; i++) {
threads[i].join();
}
if (caught.get() != null) {
throw caught.get();
}
}
/**
* Test memstore snapshots
* @throws IOException
*/
@ -443,9 +624,10 @@ public class TestMemStore extends TestCase {
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(put3);
expected.add(del2);
expected.add(put2);
expected.add(put1);
assertEquals(3, memstore.kvset.size());
assertEquals(4, memstore.kvset.size());
int i = 0;
for(KeyValue kv : memstore.kvset) {
assertEquals(expected.get(i++), kv);
@ -477,8 +659,11 @@ public class TestMemStore extends TestCase {
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(put3);
expected.add(del2);
expected.add(put2);
expected.add(put1);
assertEquals(2, memstore.kvset.size());
assertEquals(4, memstore.kvset.size());
int i = 0;
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
@ -511,9 +696,14 @@ public class TestMemStore extends TestCase {
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(del);
expected.add(put1);
expected.add(put2);
expected.add(put4);
expected.add(put3);
assertEquals(2, memstore.kvset.size());
assertEquals(5, memstore.kvset.size());
int i = 0;
for (KeyValue kv: memstore.kvset) {
assertEquals(expected.get(i++), kv);
@ -529,7 +719,7 @@ public class TestMemStore extends TestCase {
memstore.add(new KeyValue(row, fam, qf, ts, val));
KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
memstore.delete(delete);
assertEquals(1, memstore.kvset.size());
assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
@ -542,7 +732,7 @@ public class TestMemStore extends TestCase {
"row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
memstore.delete(delete);
assertEquals(1, memstore.kvset.size());
assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteColumn() throws IOException {
@ -554,7 +744,7 @@ public class TestMemStore extends TestCase {
KeyValue.Type.DeleteColumn, "dont-care");
memstore.delete(delete);
assertEquals(1, memstore.kvset.size());
assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
public void testRetainsDeleteFamily() throws IOException {
@ -566,21 +756,21 @@ public class TestMemStore extends TestCase {
KeyValue.Type.DeleteFamily, "dont-care");
memstore.delete(delete);
assertEquals(1, memstore.kvset.size());
assertEquals(2, memstore.kvset.size());
assertEquals(delete, memstore.kvset.first());
}
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
private byte [] makeQualifier(final int i1, final int i2){
//////////////////////////////////////////////////////////////////////////////
private static byte [] makeQualifier(final int i1, final int i2){
return Bytes.toBytes(Integer.toString(i1) + ";" +
Integer.toString(i2));
}
/**
* Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
* Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException
@ -590,7 +780,7 @@ public class TestMemStore extends TestCase {
}
/**
* Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
* Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
* @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException
@ -644,4 +834,57 @@ public class TestMemStore extends TestCase {
return new KeyValue(row, Bytes.toBytes("test_col"), null,
HConstants.LATEST_TIMESTAMP, value);
}
private static void addRows(int count, final MemStore mem) {
long nanos = System.nanoTime();
for (int i = 0 ; i < count ; i++) {
if (i % 1000 == 0) {
System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000);
nanos = System.nanoTime();
}
long timestamp = System.currentTimeMillis();
for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) {
byte [] row = Bytes.toBytes(i);
byte [] qf = makeQualifier(i, ii);
mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
}
}
}
static void doScan(MemStore ms, int iteration) {
long nanos = System.nanoTime();
KeyValueScanner [] ss = ms.getScanners();
KeyValueScanner s = ss[0];
s.seek(KeyValue.createFirstOnRow(new byte[]{}));
System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000);
int cnt=0;
while(s.next() != null) ++cnt;
System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt);
}
public static void main(String [] args) {
ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
MemStore ms = new MemStore();
long n1 = System.nanoTime();
addRows(25000, ms);
System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000);
System.out.println("foo");
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
for (int i = 0 ; i < 50 ; i++)
doScan(ms, i);
}
}

View File

@ -0,0 +1,109 @@
package org.apache.hadoop.hbase.regionserver;
import junit.framework.TestCase;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class TestReadWriteConsistencyControl extends TestCase {
static class Writer implements Runnable {
final AtomicBoolean finished;
final ReadWriteConsistencyControl rwcc;
final AtomicBoolean status;
Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
this.finished = finished;
this.rwcc = rwcc;
this.status = status;
}
private Random rnd = new Random();
public boolean failed = false;
public void run() {
while (!finished.get()) {
ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
// System.out.println("Begin write: " + e.getWriteNumber());
// 10 usec - 500usec (including 0)
int sleepTime = rnd.nextInt(500);
// 500 * 1000 = 500,000ns = 500 usec
// 1 * 100 = 100ns = 1usec
try {
if (sleepTime > 0)
Thread.sleep(0, sleepTime * 1000);
} catch (InterruptedException e1) {
}
try {
rwcc.completeMemstoreInsert(e);
} catch (RuntimeException ex) {
// got failure
System.out.println(ex.toString());
ex.printStackTrace();
status.set(false);
return;
// Report failure if possible.
}
}
}
}
public void testParallelism() throws Exception {
final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
final AtomicBoolean finished = new AtomicBoolean(false);
// fail flag for the reader thread
final AtomicBoolean readerFailed = new AtomicBoolean(false);
final AtomicLong failedAt = new AtomicLong();
Runnable reader = new Runnable() {
public void run() {
long prev = rwcc.memstoreReadPoint();
while (!finished.get()) {
long newPrev = rwcc.memstoreReadPoint();
if (newPrev < prev) {
// serious problem.
System.out.println("Reader got out of order, prev: " +
prev + " next was: " + newPrev);
readerFailed.set(true);
// might as well give up
failedAt.set(newPrev);
return;
}
}
}
};
// writer thread parallelism.
int n = 20;
Thread [] writers = new Thread[n];
AtomicBoolean [] statuses = new AtomicBoolean[n];
Thread readThread = new Thread(reader);
for (int i = 0 ; i < n ; ++i ) {
statuses[i] = new AtomicBoolean(true);
writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
writers[i].start();
}
readThread.start();
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException ex) {
}
finished.set(true);
readThread.join();
for (int i = 0; i < n; ++i) {
writers[i].join();
}
// check failure.
assertFalse(readerFailed.get());
for (int i = 0; i < n; ++i) {
assertTrue(statuses[i].get());
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2009 The Apache Software Foundation
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -20,24 +20,23 @@
package org.apache.hadoop.hbase.regionserver;
import junit.framework.TestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import junit.framework.TestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class TestStoreScanner extends TestCase {
private static final String CF_STR = "cf";
final byte [] CF = Bytes.toBytes(CF_STR);
/**
/*
* Test utility for building a NavigableSet for scanners.
* @param strCols
* @return
@ -128,7 +127,7 @@ public class TestStoreScanner extends TestCase {
assertEquals(kvs[0], results.get(0));
}
/**
/*
* Test test shows exactly how the matcher's return codes confuses the StoreScanner
* and prevent it from doing the right thing. Seeking once, then nexting twice
* should return R1, then R2, but in this case it doesnt.
@ -189,7 +188,7 @@ public class TestStoreScanner extends TestCase {
assertEquals(0, results.size());
}
/**
/*
* Test the case where there is a delete row 'in front of' the next row, the scanner
* will move to the next row.
*/
@ -407,9 +406,9 @@ public class TestStoreScanner extends TestCase {
results.clear();
assertEquals(false, scan.next(results));
}
/**
* Test expiration of KeyValues in combination with a configured TTL for
/*
* Test expiration of KeyValues in combination with a configured TTL for
* a column family (as should be triggered in a major compaction).
*/
public void testWildCardTtlScan() throws IOException {