HBASE-532 Odd interaction between HRegion.get, HRegion.deleteAll and compactions
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@648850 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dbd74f2f9a
commit
373761483f
|
@ -9,6 +9,7 @@ Hbase Change Log
|
|||
HBASE-575 master dies with stack overflow error if rootdir isn't qualified
|
||||
HBASE-582 HBase 554 forgot to clear results on each iteration caused by a filter
|
||||
(Clint Morgan via Stack)
|
||||
HBASE-532 Odd interaction between HRegion.get, HRegion.deleteAll and compactions
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-559 MR example job to count table rows
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.io.Text;
|
|||
|
||||
/**
|
||||
* Abstract base class that implements the InternalScanner.
|
||||
* Used by the concrete HMemcacheScanner and HStoreScanners
|
||||
*/
|
||||
public abstract class HAbstractScanner implements InternalScanner {
|
||||
final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
|
@ -72,7 +71,7 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
ColumnMatcher(final Text col) throws IOException {
|
||||
Text qualifier = HStoreKey.extractQualifier(col);
|
||||
try {
|
||||
if(qualifier == null || qualifier.getLength() == 0) {
|
||||
if (qualifier == null || qualifier.getLength() == 0) {
|
||||
this.matchType = MATCH_TYPE.FAMILY_ONLY;
|
||||
this.family = HStoreKey.extractFamily(col).toText();
|
||||
this.wildCardmatch = true;
|
||||
|
@ -111,12 +110,6 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
protected TreeMap<Text, Vector<ColumnMatcher>> okCols; // Holds matchers for each column family
|
||||
|
||||
protected boolean scannerClosed = false; // True when scanning is done
|
||||
|
||||
// Keys retrieved from the sources
|
||||
protected HStoreKey keys[];
|
||||
// Values that correspond to those keys
|
||||
protected byte [][] vals;
|
||||
|
||||
protected long timestamp; // The timestamp to match entries against
|
||||
private boolean wildcardMatch;
|
||||
private boolean multipleMatchers;
|
||||
|
@ -130,7 +123,7 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
for(int i = 0; i < targetCols.length; i++) {
|
||||
Text family = HStoreKey.extractFamily(targetCols[i]).toText();
|
||||
Vector<ColumnMatcher> matchers = okCols.get(family);
|
||||
if(matchers == null) {
|
||||
if (matchers == null) {
|
||||
matchers = new Vector<ColumnMatcher>();
|
||||
}
|
||||
ColumnMatcher matcher = new ColumnMatcher(targetCols[i]);
|
||||
|
@ -138,6 +131,10 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
this.wildcardMatch = true;
|
||||
}
|
||||
matchers.add(matcher);
|
||||
// TODO: Does this multipleMatchers matter any more now that scanners
|
||||
// are done at the store level? It might have mattered when scanners
|
||||
// could have been done at the region level when memcache was at the
|
||||
// region level rather than down here at store level.
|
||||
if (matchers.size() > 1) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
|
@ -146,21 +143,20 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
}
|
||||
|
||||
/**
|
||||
* For a particular column i, find all the matchers defined for the column.
|
||||
* For a particular column, find all the matchers defined for the column.
|
||||
* Compare the column family and column key using the matchers. The first one
|
||||
* that matches returns true. If no matchers are successful, return false.
|
||||
*
|
||||
* @param i index into the keys array
|
||||
* @return true - if any of the matchers for the column match the column family
|
||||
* and the column key.
|
||||
* @param column Column to test
|
||||
* @return true if any of the matchers for the column match the column family
|
||||
* and the column key.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean columnMatch(int i) throws IOException {
|
||||
Text column = keys[i].getColumn();
|
||||
protected boolean columnMatch(final Text column) throws IOException {
|
||||
Vector<ColumnMatcher> matchers =
|
||||
okCols.get(HStoreKey.extractFamily(column));
|
||||
if(matchers == null) {
|
||||
this.okCols.get(HStoreKey.extractFamily(column));
|
||||
if (matchers == null) {
|
||||
return false;
|
||||
}
|
||||
for(int m = 0; m < matchers.size(); m++) {
|
||||
|
@ -171,18 +167,6 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the user didn't want to start scanning at the first row, this method
|
||||
* seeks to the requested row.
|
||||
*/
|
||||
abstract boolean findFirstRow(int i, Text firstRow) throws IOException;
|
||||
|
||||
/** The concrete implementations provide a mechanism to find the next set of values */
|
||||
abstract boolean getNext(int i) throws IOException;
|
||||
|
||||
/** Mechanism used by concrete implementation to shut down a particular scanner */
|
||||
abstract void closeSubScanner(int i);
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean isWildcardScanner() {
|
||||
return this.wildcardMatch;
|
||||
|
@ -193,87 +177,9 @@ public abstract class HAbstractScanner implements InternalScanner {
|
|||
return this.multipleMatchers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next set of values for this scanner.
|
||||
*
|
||||
* @param key The key that matched
|
||||
* @param results All the results for <code>key</code>
|
||||
* @return true if a match was found
|
||||
* @throws IOException
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
|
||||
*/
|
||||
public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
|
||||
throws IOException {
|
||||
if (scannerClosed) {
|
||||
return false;
|
||||
}
|
||||
// Find the next row label (and timestamp)
|
||||
Text chosenRow = null;
|
||||
long chosenTimestamp = -1;
|
||||
for(int i = 0; i < keys.length; i++) {
|
||||
if((keys[i] != null)
|
||||
&& (columnMatch(i))
|
||||
&& (keys[i].getTimestamp() <= this.timestamp)
|
||||
&& ((chosenRow == null)
|
||||
|| (keys[i].getRow().compareTo(chosenRow) < 0)
|
||||
|| ((keys[i].getRow().compareTo(chosenRow) == 0)
|
||||
&& (keys[i].getTimestamp() > chosenTimestamp)))) {
|
||||
chosenRow = new Text(keys[i].getRow());
|
||||
chosenTimestamp = keys[i].getTimestamp();
|
||||
}
|
||||
}
|
||||
public abstract boolean next(HStoreKey key, SortedMap<Text, byte []> results)
|
||||
throws IOException;
|
||||
|
||||
// Grab all the values that match this row/timestamp
|
||||
boolean insertedItem = false;
|
||||
if(chosenRow != null) {
|
||||
key.setRow(chosenRow);
|
||||
key.setVersion(chosenTimestamp);
|
||||
key.setColumn(new Text(""));
|
||||
|
||||
for(int i = 0; i < keys.length; i++) {
|
||||
// Fetch the data
|
||||
while((keys[i] != null)
|
||||
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
|
||||
|
||||
// If we are doing a wild card match or there are multiple matchers
|
||||
// per column, we need to scan all the older versions of this row
|
||||
// to pick up the rest of the family members
|
||||
|
||||
if(!wildcardMatch
|
||||
&& !multipleMatchers
|
||||
&& (keys[i].getTimestamp() != chosenTimestamp)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if(columnMatch(i)) {
|
||||
// We only want the first result for any specific family member
|
||||
if(!results.containsKey(keys[i].getColumn())) {
|
||||
results.put(new Text(keys[i].getColumn()), vals[i]);
|
||||
insertedItem = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(!getNext(i)) {
|
||||
closeSubScanner(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Advance the current scanner beyond the chosen row, to
|
||||
// a valid timestamp, so we're ready next time.
|
||||
|
||||
while((keys[i] != null)
|
||||
&& ((keys[i].getRow().compareTo(chosenRow) <= 0)
|
||||
|| (keys[i].getTimestamp() > this.timestamp)
|
||||
|| (! columnMatch(i)))) {
|
||||
getNext(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return insertedItem;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
|
||||
throw new UnsupportedOperationException("Unimplemented serverside. " +
|
||||
"next(HStoreKey, StortedMap(...) is more efficient");
|
||||
|
|
|
@ -611,24 +611,30 @@ public class HStore implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
void flushCache(final long logCacheFlushId) throws IOException {
|
||||
internalFlushCache(memcache.getSnapshot(), logCacheFlushId);
|
||||
SortedMap<HStoreKey, byte []> cache = this.memcache.snapshot();
|
||||
internalFlushCache(cache, logCacheFlushId);
|
||||
// If an exception happens flushing, we let it out without clearing
|
||||
// the memcache snapshot. The old snapshot will be returned when we say
|
||||
// 'snapshot', the next time flush comes around.
|
||||
this.memcache.clearSnapshot(cache);
|
||||
}
|
||||
|
||||
private void internalFlushCache(SortedMap<HStoreKey, byte []> cache,
|
||||
long logCacheFlushId) throws IOException {
|
||||
|
||||
// Don't flush if there are no entries.
|
||||
if (cache.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: We can fail in the below block before we complete adding this
|
||||
// flush to list of store files. Add cleanup of anything put on filesystem
|
||||
// if we fail.
|
||||
synchronized(flushLock) {
|
||||
// A. Write the Maps out to the disk
|
||||
HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
|
||||
info.getEncodedName(), family.getFamilyName(), -1L, null);
|
||||
String name = flushedFile.toString();
|
||||
info.getEncodedName(), family.getFamilyName(), -1L, null);
|
||||
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
|
||||
this.bloomFilter);
|
||||
this.bloomFilter);
|
||||
|
||||
// Here we tried picking up an existing HStoreFile from disk and
|
||||
// interlacing the memcache flush compacting as we go. The notion was
|
||||
|
@ -678,7 +684,7 @@ public class HStore implements HConstants {
|
|||
flushedFile.getReader(this.fs, this.bloomFilter));
|
||||
this.storefiles.put(flushid, flushedFile);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added " + name + " with " + entries +
|
||||
LOG.debug("Added " + flushedFile.toString() + " with " + entries +
|
||||
" entries, sequence id " + logCacheFlushId + ", data size " +
|
||||
StringUtils.humanReadableInt(cacheSize) + ", file size " +
|
||||
StringUtils.humanReadableInt(newStoreSize) + " for " +
|
||||
|
@ -1360,31 +1366,27 @@ public class HStore implements HConstants {
|
|||
*/
|
||||
Text getRowKeyAtOrBefore(final Text row)
|
||||
throws IOException{
|
||||
// map of HStoreKeys that are candidates for holding the row key that
|
||||
// most closely matches what we're looking for. we'll have to update it
|
||||
// Map of HStoreKeys that are candidates for holding the row key that
|
||||
// most closely matches what we're looking for. We'll have to update it
|
||||
// deletes found all over the place as we go along before finally reading
|
||||
// the best key out of it at the end.
|
||||
SortedMap<HStoreKey, Long> candidateKeys = new TreeMap<HStoreKey, Long>();
|
||||
|
||||
// obtain read lock
|
||||
// Obtain read lock
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
// Process each store file
|
||||
MapFile.Reader[] maparray = getReaders();
|
||||
|
||||
// process each store file
|
||||
for(int i = maparray.length - 1; i >= 0; i--) {
|
||||
for (int i = maparray.length - 1; i >= 0; i--) {
|
||||
// update the candidate keys from the current map file
|
||||
rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys);
|
||||
}
|
||||
|
||||
// finally, check the memcache
|
||||
memcache.getRowKeyAtOrBefore(row, candidateKeys);
|
||||
// Finally, check the memcache
|
||||
this.memcache.getRowKeyAtOrBefore(row, candidateKeys);
|
||||
|
||||
// return the best key from candidateKeys
|
||||
if (!candidateKeys.isEmpty()) {
|
||||
return candidateKeys.lastKey().getRow();
|
||||
}
|
||||
return null;
|
||||
// Return the best key from candidateKeys
|
||||
return candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
|
|
@ -30,13 +30,14 @@ import org.apache.hadoop.hbase.HStoreKey;
|
|||
* HStoreKeys and byte[] instead of RowResults. This is because they are
|
||||
* actually close to how the data is physically stored, and therefore it is more
|
||||
* convenient to interact with them that way. It is also much easier to merge
|
||||
* the results across SortedMaps that RowResults.
|
||||
* the results across SortedMaps than RowResults.
|
||||
*
|
||||
* Additionally, we need to be able to determine if the scanner is doing wildcard
|
||||
* column matches (when only a column family is specified or if a column regex
|
||||
* is specified) or if multiple members of the same column family were
|
||||
* specified. If so, we need to ignore the timestamp to ensure that we get all
|
||||
* the family members, as they may have been last updated at different times.
|
||||
* <p>Additionally, we need to be able to determine if the scanner is doing
|
||||
* wildcard column matches (when only a column family is specified or if a
|
||||
* column regex is specified) or if multiple members of the same column family
|
||||
* were specified. If so, we need to ignore the timestamp to ensure that we get
|
||||
* all the family members, as they may have been last updated at different
|
||||
* times.
|
||||
*/
|
||||
public interface InternalScanner extends Closeable {
|
||||
/**
|
||||
|
|
|
@ -21,59 +21,114 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.rmi.UnexpectedException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
||||
|
||||
/**
|
||||
* The Memcache holds in-memory modifications to the HRegion. This is really a
|
||||
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
|
||||
* The Memcache holds in-memory modifications to the HRegion.
|
||||
* Keeps a current map. When asked to flush the map, current map is moved
|
||||
* to snapshot and is cleared. We continue to serve edits out of new map
|
||||
* and backing snapshot until flusher reports in that the flush succeeded. At
|
||||
* this point we let the snapshot go.
|
||||
*/
|
||||
class Memcache {
|
||||
private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
|
||||
// Note that since these structures are always accessed with a lock held,
|
||||
// no additional synchronization is required.
|
||||
// so no additional synchronization is required.
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
private final SortedMap<HStoreKey, byte[]> memcache =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
// The currently active sorted map of edits.
|
||||
private volatile SortedMap<HStoreKey, byte[]> memcache =
|
||||
createSynchronizedSortedMap();
|
||||
|
||||
volatile SortedMap<HStoreKey, byte[]> snapshot;
|
||||
// Snapshot of memcache. Made for flusher.
|
||||
private volatile SortedMap<HStoreKey, byte[]> snapshot =
|
||||
createSynchronizedSortedMap();
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
||||
/*
|
||||
* Utility method.
|
||||
* @return sycnhronized sorted map of HStoreKey to byte arrays.
|
||||
*/
|
||||
public Memcache() {
|
||||
snapshot =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
private static SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap() {
|
||||
return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a snapshot of the current Memcache
|
||||
* Must be followed by a call to {@link #clearSnapshot(SortedMap)}
|
||||
* @return Snapshot. Never null. May have no entries.
|
||||
*/
|
||||
void snapshot() {
|
||||
SortedMap<HStoreKey, byte[]> snapshot() {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
if (memcache.size() != 0) {
|
||||
snapshot.putAll(memcache);
|
||||
memcache.clear();
|
||||
}
|
||||
// If snapshot has entries, then flusher failed or didn't call cleanup.
|
||||
if (this.snapshot.size() > 0) {
|
||||
LOG.warn("Returning extant snapshot. Is there another ongoing " +
|
||||
"flush or did last attempt fail?");
|
||||
return this.snapshot;
|
||||
}
|
||||
// We used to synchronize on the memcache here but we're inside a
|
||||
// write lock so removed it. Comment is left in case removal was a
|
||||
// mistake. St.Ack
|
||||
if (this.memcache.size() != 0) {
|
||||
this.snapshot = this.memcache;
|
||||
this.memcache = createSynchronizedSortedMap();
|
||||
}
|
||||
return this.snapshot;
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the current snapshot.
|
||||
* @return Return snapshot.
|
||||
* @see {@link #snapshot()}
|
||||
* @see {@link #clearSnapshot(SortedMap)}
|
||||
*/
|
||||
SortedMap<HStoreKey, byte[]> getSnapshot() {
|
||||
return this.snapshot;
|
||||
}
|
||||
|
||||
/**
|
||||
* The passed snapshot was successfully persisted; it can be let go.
|
||||
* @param ss The snapshot to clean out.
|
||||
* @throws UnexpectedException
|
||||
* @see {@link #snapshot()}
|
||||
*/
|
||||
void clearSnapshot(final SortedMap<HStoreKey, byte []> ss)
|
||||
throws UnexpectedException {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
if (this.snapshot != ss) {
|
||||
throw new UnexpectedException("Current snapshot is " +
|
||||
this.snapshot + ", was passed " + ss);
|
||||
}
|
||||
// OK. Passed in snapshot is same as current snapshot. If not-empty,
|
||||
// create a new snapshot and let the old one go.
|
||||
if (ss.size() != 0) {
|
||||
this.snapshot = createSynchronizedSortedMap();
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
|
@ -81,32 +136,14 @@ class Memcache {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return memcache snapshot
|
||||
*/
|
||||
SortedMap<HStoreKey, byte[]> getSnapshot() {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
SortedMap<HStoreKey, byte[]> currentSnapshot = snapshot;
|
||||
snapshot =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
|
||||
return currentSnapshot;
|
||||
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a value.
|
||||
* Write an update
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
void add(final HStoreKey key, final byte[] value) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
memcache.put(key, value);
|
||||
|
||||
this.memcache.put(key, value);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
@ -122,20 +159,80 @@ class Memcache {
|
|||
this.lock.readLock().lock();
|
||||
try {
|
||||
List<Cell> results;
|
||||
synchronized (memcache) {
|
||||
results = internalGet(memcache, key, numVersions);
|
||||
// The synchronizations here are because internalGet iterates
|
||||
synchronized (this.memcache) {
|
||||
results = internalGet(this.memcache, key, numVersions);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
synchronized (this.snapshot) {
|
||||
results.addAll(results.size(),
|
||||
internalGet(snapshot, key, numVersions - results.size()));
|
||||
internalGet(this.snapshot, key, numVersions - results.size()));
|
||||
}
|
||||
return results;
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param a
|
||||
* @param b
|
||||
* @return Return lowest of a or b or null if both a and b are null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private WritableComparable getLowest(final WritableComparable a,
|
||||
final WritableComparable b) {
|
||||
if (a == null) {
|
||||
return b;
|
||||
}
|
||||
if (b == null) {
|
||||
return a;
|
||||
}
|
||||
return a.compareTo(b) <= 0? a: b;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row Find the row that comes after this one.
|
||||
* @return Next row or null if none found
|
||||
*/
|
||||
Text getNextRow(final Text row) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
return (Text)getLowest(getNextRow(row, this.memcache),
|
||||
getNextRow(row, this.snapshot));
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param row Find row that follows this one.
|
||||
* @param map Map to look in for a row beyond <code>row</code>.
|
||||
* This method synchronizes on passed map while iterating it.
|
||||
* @return Next row or null if none found.
|
||||
*/
|
||||
private Text getNextRow(final Text row,
|
||||
final SortedMap<HStoreKey, byte []> map) {
|
||||
Text result = null;
|
||||
// Synchronize on the map to make the tailMap making 'safe'.
|
||||
synchronized (map) {
|
||||
// Make an HSK with maximum timestamp so we get past most of the current
|
||||
// rows cell entries.
|
||||
HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP);
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk);
|
||||
// Iterate until we fall into the next row; i.e. move off current row
|
||||
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
|
||||
HStoreKey itKey = es.getKey();
|
||||
if (itKey.getRow().compareTo(row) <= 0) {
|
||||
continue;
|
||||
}
|
||||
// Note: Not suppressing deletes.
|
||||
result = itKey.getRow();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all the available columns for the given key. The key indicates a
|
||||
* row and timestamp, but not a column name.
|
||||
|
@ -148,13 +245,13 @@ class Memcache {
|
|||
Map<Text, Cell> results) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
internalGetFull(memcache, key, columns, deletes, results);
|
||||
// The synchronizations here are because internalGet iterates
|
||||
synchronized (this.memcache) {
|
||||
internalGetFull(this.memcache, key, columns, deletes, results);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
internalGetFull(snapshot, key, columns, deletes, results);
|
||||
synchronized (this.snapshot) {
|
||||
internalGetFull(this.snapshot, key, columns, deletes, results);
|
||||
}
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
@ -178,7 +275,7 @@ class Memcache {
|
|||
if (HLogEdit.isDeleted(val)) {
|
||||
if (!deletes.containsKey(itCol)
|
||||
|| deletes.get(itCol).longValue() < itKey.getTimestamp()) {
|
||||
deletes.put(new Text(itCol), itKey.getTimestamp());
|
||||
deletes.put(new Text(itCol), Long.valueOf(itKey.getTimestamp()));
|
||||
}
|
||||
} else if (!(deletes.containsKey(itCol)
|
||||
&& deletes.get(itCol).longValue() >= itKey.getTimestamp())) {
|
||||
|
@ -192,13 +289,13 @@ class Memcache {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param row
|
||||
* @param timestamp
|
||||
* @param row Row to look for.
|
||||
* @param candidateKeys Map of candidate keys (Accumulation over lots of
|
||||
* lookup over stores and memcaches)
|
||||
*/
|
||||
void getRowKeyAtOrBefore(final Text row,
|
||||
SortedMap<HStoreKey, Long> candidateKeys) {
|
||||
this.lock.readLock().lock();
|
||||
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
internalGetRowKeyAtOrBefore(memcache, row, candidateKeys);
|
||||
|
@ -313,7 +410,7 @@ class Memcache {
|
|||
}
|
||||
} else {
|
||||
candidateKeys.put(stripTimestamp(found_key),
|
||||
found_key.getTimestamp());
|
||||
Long.valueOf(found_key.getTimestamp()));
|
||||
}
|
||||
} while (key_iterator.hasNext());
|
||||
}
|
||||
|
@ -451,22 +548,11 @@ class Memcache {
|
|||
* @return a scanner over the keys in the Memcache
|
||||
*/
|
||||
InternalScanner getScanner(long timestamp,
|
||||
Text targetCols[], Text firstRow) throws IOException {
|
||||
|
||||
// Here we rely on ReentrantReadWriteLock's ability to acquire multiple
|
||||
// locks by the same thread and to be able to downgrade a write lock to
|
||||
// a read lock. We need to hold a lock throughout this method, but only
|
||||
// need the write lock while creating the memcache snapshot
|
||||
|
||||
this.lock.writeLock().lock(); // hold write lock during memcache snapshot
|
||||
snapshot(); // snapshot memcache
|
||||
this.lock.readLock().lock(); // acquire read lock
|
||||
this.lock.writeLock().unlock(); // downgrade to read lock
|
||||
Text targetCols[], Text firstRow)
|
||||
throws IOException {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
// Prevent a cache flush while we are constructing the scanner
|
||||
|
||||
return new MemcacheScanner(timestamp, targetCols, firstRow);
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
@ -477,112 +563,66 @@ class Memcache {
|
|||
// It lets the caller scan the contents of the Memcache.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class MemcacheScanner extends HAbstractScanner {
|
||||
SortedMap<HStoreKey, byte []> backingMap;
|
||||
Iterator<HStoreKey> keyIterator;
|
||||
private class MemcacheScanner extends HAbstractScanner {
|
||||
private Text currentRow;
|
||||
private final Set<Text> columns;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MemcacheScanner(final long timestamp, final Text targetCols[],
|
||||
final Text firstRow) throws IOException {
|
||||
|
||||
final Text firstRow)
|
||||
throws IOException {
|
||||
// Call to super will create ColumnMatchers and whether this is a regex
|
||||
// scanner or not. Will also save away timestamp. Also sorts rows.
|
||||
super(timestamp, targetCols);
|
||||
try {
|
||||
this.backingMap = new TreeMap<HStoreKey, byte[]>();
|
||||
this.backingMap.putAll(snapshot);
|
||||
this.keys = new HStoreKey[1];
|
||||
this.vals = new byte[1][];
|
||||
this.currentRow = firstRow;
|
||||
// If we're being asked to scan explicit columns rather than all in
|
||||
// a family or columns that match regexes, cache the sorted array of
|
||||
// columns.
|
||||
this.columns = this.isWildcardScanner()? this.okCols.keySet(): null;
|
||||
}
|
||||
|
||||
// Generate list of iterators
|
||||
|
||||
HStoreKey firstKey = new HStoreKey(firstRow);
|
||||
if (firstRow != null && firstRow.getLength() != 0) {
|
||||
keyIterator =
|
||||
backingMap.tailMap(firstKey).keySet().iterator();
|
||||
|
||||
} else {
|
||||
keyIterator = backingMap.keySet().iterator();
|
||||
}
|
||||
|
||||
while (getNext(0)) {
|
||||
if (!findFirstRow(0, firstRow)) {
|
||||
@Override
|
||||
public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
|
||||
throws IOException {
|
||||
if (this.scannerClosed) {
|
||||
return false;
|
||||
}
|
||||
Map<Text, Long> deletes = new HashMap<Text, Long>();
|
||||
// Catch all row results in here. These results are ten filtered to
|
||||
// ensure they match column name regexes, or if none, added to results.
|
||||
Map<Text, Cell> rowResults = new HashMap<Text, Cell>();
|
||||
if (results.size() > 0) {
|
||||
results.clear();
|
||||
}
|
||||
while (results.size() <= 0 &&
|
||||
(this.currentRow = getNextRow(this.currentRow)) != null) {
|
||||
if (deletes.size() > 0) {
|
||||
deletes.clear();
|
||||
}
|
||||
if (rowResults.size() > 0) {
|
||||
rowResults.clear();
|
||||
}
|
||||
key.setRow(this.currentRow);
|
||||
key.setVersion(this.timestamp);
|
||||
getFull(key, isWildcardScanner()? null: this.columns, deletes, rowResults);
|
||||
for (Map.Entry<Text, Cell> e: rowResults.entrySet()) {
|
||||
Text column = e.getKey();
|
||||
Cell c = e.getValue();
|
||||
if (isWildcardScanner()) {
|
||||
// Check the results match. We only check columns, not timestamps.
|
||||
// We presume that timestamps have been handled properly when we
|
||||
// called getFull.
|
||||
if (!columnMatch(column)) {
|
||||
continue;
|
||||
}
|
||||
if (columnMatch(0)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException ex) {
|
||||
LOG.error("error initializing Memcache scanner: ", ex);
|
||||
close();
|
||||
IOException e = new IOException("error initializing Memcache scanner");
|
||||
e.initCause(ex);
|
||||
throw e;
|
||||
|
||||
} catch(IOException ex) {
|
||||
LOG.error("error initializing Memcache scanner: ", ex);
|
||||
close();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The user didn't want to start scanning at the first row. This method
|
||||
* seeks to the requested row.
|
||||
*
|
||||
* @param i which iterator to advance
|
||||
* @param firstRow seek to this row
|
||||
* @return true if this is the first row
|
||||
*/
|
||||
@Override
|
||||
boolean findFirstRow(int i, Text firstRow) {
|
||||
return firstRow.getLength() == 0 ||
|
||||
keys[i].getRow().compareTo(firstRow) >= 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next value from the specified iterator.
|
||||
*
|
||||
* @param i Which iterator to fetch next value from
|
||||
* @return true if there is more data available
|
||||
*/
|
||||
@Override
|
||||
boolean getNext(int i) {
|
||||
boolean result = false;
|
||||
while (true) {
|
||||
if (!keyIterator.hasNext()) {
|
||||
closeSubScanner(i);
|
||||
break;
|
||||
}
|
||||
// Check key is < than passed timestamp for this scanner.
|
||||
HStoreKey hsk = keyIterator.next();
|
||||
if (hsk == null) {
|
||||
throw new NullPointerException("Unexpected null key");
|
||||
}
|
||||
if (hsk.getTimestamp() <= this.timestamp) {
|
||||
this.keys[i] = hsk;
|
||||
this.vals[i] = backingMap.get(keys[i]);
|
||||
result = true;
|
||||
break;
|
||||
results.put(column, c.getValue());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return results.size() > 0;
|
||||
}
|
||||
|
||||
/** Shut down an individual map iterator. */
|
||||
@Override
|
||||
void closeSubScanner(int i) {
|
||||
keyIterator = null;
|
||||
keys[i] = null;
|
||||
vals[i] = null;
|
||||
backingMap = null;
|
||||
}
|
||||
|
||||
/** Shut down map iterators */
|
||||
public void close() {
|
||||
if (!scannerClosed) {
|
||||
if(keyIterator != null) {
|
||||
closeSubScanner(0);
|
||||
}
|
||||
scannerClosed = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -28,14 +29,19 @@ import org.apache.hadoop.hbase.HStoreKey;
|
|||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
|
||||
/**
|
||||
* A scanner that iterates through the HStore files
|
||||
* A scanner that iterates through HStore files
|
||||
*/
|
||||
class StoreFileScanner extends HAbstractScanner {
|
||||
@SuppressWarnings("hiding")
|
||||
// Keys retrieved from the sources
|
||||
private HStoreKey keys[];
|
||||
// Values that correspond to those keys
|
||||
private byte [][] vals;
|
||||
|
||||
private MapFile.Reader[] readers;
|
||||
private HStore store;
|
||||
|
||||
public StoreFileScanner(HStore store, long timestamp, Text[] targetCols, Text firstRow)
|
||||
public StoreFileScanner(final HStore store, final long timestamp,
|
||||
final Text[] targetCols, final Text firstRow)
|
||||
throws IOException {
|
||||
super(timestamp, targetCols);
|
||||
this.store = store;
|
||||
|
@ -54,13 +60,11 @@ class StoreFileScanner extends HAbstractScanner {
|
|||
// Advance the readers to the first pos.
|
||||
for(i = 0; i < readers.length; i++) {
|
||||
keys[i] = new HStoreKey();
|
||||
|
||||
if(firstRow.getLength() != 0) {
|
||||
if(findFirstRow(i, firstRow)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
while(getNext(i)) {
|
||||
if(columnMatch(i)) {
|
||||
break;
|
||||
|
@ -76,15 +80,109 @@ class StoreFileScanner extends HAbstractScanner {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For a particular column i, find all the matchers defined for the column.
|
||||
* Compare the column family and column key using the matchers. The first one
|
||||
* that matches returns true. If no matchers are successful, return false.
|
||||
*
|
||||
* @param i index into the keys array
|
||||
* @return true if any of the matchers for the column match the column family
|
||||
* and the column key.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean columnMatch(int i) throws IOException {
|
||||
return columnMatch(keys[i].getColumn());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next set of values for this scanner.
|
||||
*
|
||||
* @param key The key that matched
|
||||
* @param results All the results for <code>key</code>
|
||||
* @return true if a match was found
|
||||
* @throws IOException
|
||||
*
|
||||
* @see org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
|
||||
*/
|
||||
@Override
|
||||
public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
|
||||
throws IOException {
|
||||
if (scannerClosed) {
|
||||
return false;
|
||||
}
|
||||
// Find the next row label (and timestamp)
|
||||
Text chosenRow = null;
|
||||
long chosenTimestamp = -1;
|
||||
for(int i = 0; i < keys.length; i++) {
|
||||
if((keys[i] != null)
|
||||
&& (columnMatch(i))
|
||||
&& (keys[i].getTimestamp() <= this.timestamp)
|
||||
&& ((chosenRow == null)
|
||||
|| (keys[i].getRow().compareTo(chosenRow) < 0)
|
||||
|| ((keys[i].getRow().compareTo(chosenRow) == 0)
|
||||
&& (keys[i].getTimestamp() > chosenTimestamp)))) {
|
||||
chosenRow = new Text(keys[i].getRow());
|
||||
chosenTimestamp = keys[i].getTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
// Grab all the values that match this row/timestamp
|
||||
boolean insertedItem = false;
|
||||
if(chosenRow != null) {
|
||||
key.setRow(chosenRow);
|
||||
key.setVersion(chosenTimestamp);
|
||||
key.setColumn(new Text(""));
|
||||
|
||||
for(int i = 0; i < keys.length; i++) {
|
||||
// Fetch the data
|
||||
while((keys[i] != null)
|
||||
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
|
||||
|
||||
// If we are doing a wild card match or there are multiple matchers
|
||||
// per column, we need to scan all the older versions of this row
|
||||
// to pick up the rest of the family members
|
||||
|
||||
if(!isWildcardScanner()
|
||||
&& !isMultipleMatchScanner()
|
||||
&& (keys[i].getTimestamp() != chosenTimestamp)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if(columnMatch(i)) {
|
||||
// We only want the first result for any specific family member
|
||||
if(!results.containsKey(keys[i].getColumn())) {
|
||||
results.put(new Text(keys[i].getColumn()), vals[i]);
|
||||
insertedItem = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(!getNext(i)) {
|
||||
closeSubScanner(i);
|
||||
}
|
||||
}
|
||||
|
||||
// Advance the current scanner beyond the chosen row, to
|
||||
// a valid timestamp, so we're ready next time.
|
||||
|
||||
while((keys[i] != null)
|
||||
&& ((keys[i].getRow().compareTo(chosenRow) <= 0)
|
||||
|| (keys[i].getTimestamp() > this.timestamp)
|
||||
|| (! columnMatch(i)))) {
|
||||
getNext(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return insertedItem;
|
||||
}
|
||||
|
||||
/**
|
||||
* The user didn't want to start scanning at the first row. This method
|
||||
* seeks to the requested row.
|
||||
*
|
||||
* @param i - which iterator to advance
|
||||
* @param firstRow - seek to this row
|
||||
* @return - true if this is the first row or if the row was not found
|
||||
* @param i which iterator to advance
|
||||
* @param firstRow seek to this row
|
||||
* @return true if this is the first row or if the row was not found
|
||||
*/
|
||||
@Override
|
||||
boolean findFirstRow(int i, Text firstRow) throws IOException {
|
||||
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
|
||||
HStoreKey firstKey
|
||||
|
@ -104,10 +202,9 @@ class StoreFileScanner extends HAbstractScanner {
|
|||
/**
|
||||
* Get the next value from the specified reader.
|
||||
*
|
||||
* @param i - which reader to fetch next value from
|
||||
* @return - true if there is more data available
|
||||
* @param i which reader to fetch next value from
|
||||
* @return true if there is more data available
|
||||
*/
|
||||
@Override
|
||||
boolean getNext(int i) throws IOException {
|
||||
boolean result = false;
|
||||
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
|
||||
|
@ -126,7 +223,6 @@ class StoreFileScanner extends HAbstractScanner {
|
|||
}
|
||||
|
||||
/** Close down the indicated reader. */
|
||||
@Override
|
||||
void closeSubScanner(int i) {
|
||||
try {
|
||||
if(readers[i] != null) {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.rmi.UnexpectedException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
|
@ -129,13 +130,13 @@ public class TestHMemcache extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void runSnapshot(final Memcache hmc) {
|
||||
private void runSnapshot(final Memcache hmc) throws UnexpectedException {
|
||||
// Save off old state.
|
||||
int oldHistorySize = hmc.snapshot.size();
|
||||
hmc.snapshot();
|
||||
int oldHistorySize = hmc.getSnapshot().size();
|
||||
SortedMap<HStoreKey, byte[]> ss = hmc.snapshot();
|
||||
// Make some assertions about what just happened.
|
||||
assertTrue("History size has not increased",
|
||||
oldHistorySize < hmc.snapshot.size());
|
||||
assertTrue("History size has not increased", oldHistorySize < ss.size());
|
||||
hmc.clearSnapshot(ss);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -148,9 +149,8 @@ public class TestHMemcache extends TestCase {
|
|||
for (int i = 0; i < snapshotCount; i++) {
|
||||
addRows(this.hmemcache);
|
||||
runSnapshot(this.hmemcache);
|
||||
this.hmemcache.getSnapshot();
|
||||
assertEquals("History not being cleared", 0,
|
||||
this.hmemcache.snapshot.size());
|
||||
SortedMap<HStoreKey, byte[]> ss = this.hmemcache.getSnapshot();
|
||||
assertEquals("History not being cleared", 0, ss.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -195,6 +195,40 @@ public class TestHMemcache extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/** Test getNextRow from memcache
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public void testGetNextRow() throws UnsupportedEncodingException {
|
||||
addRows(this.hmemcache);
|
||||
Text closestToEmpty = this.hmemcache.getNextRow(HConstants.EMPTY_TEXT);
|
||||
assertEquals(closestToEmpty, getRowName(0));
|
||||
for (int i = 0; i < ROW_COUNT; i++) {
|
||||
Text nr = this.hmemcache.getNextRow(getRowName(i));
|
||||
if (i + 1 == ROW_COUNT) {
|
||||
assertEquals(nr, null);
|
||||
} else {
|
||||
assertEquals(nr, getRowName(i + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Test getClosest from memcache
|
||||
* @throws UnsupportedEncodingException
|
||||
*/
|
||||
public void testGetClosest() throws UnsupportedEncodingException {
|
||||
addRows(this.hmemcache);
|
||||
Text closestToEmpty = this.hmemcache.getNextRow(HConstants.EMPTY_TEXT);
|
||||
assertEquals(closestToEmpty, getRowName(0));
|
||||
for (int i = 0; i < ROW_COUNT; i++) {
|
||||
Text nr = this.hmemcache.getNextRow(getRowName(i));
|
||||
if (i + 1 == ROW_COUNT) {
|
||||
assertEquals(nr, null);
|
||||
} else {
|
||||
assertEquals(nr, getRowName(i + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test memcache scanner
|
||||
* @throws IOException
|
||||
|
@ -272,7 +306,4 @@ public class TestHMemcache extends TestCase {
|
|||
private HStoreKey getHSKForRow(Text row) {
|
||||
return new HStoreKey(row, new Text("test_col:"), HConstants.LATEST_TIMESTAMP);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue