HBASE-588 Still a 'hole' in scanners, even after HBASE-532

Add a ChangedReadersObserver interface.  HStore notifies registered
observers when list of HStoreFile Readers changes -- at flush time
and at compaction time.  Scanners are only current observers.

Also fix a deadlock flushing by changing lock types and moving flush
request out from under lock.

M  src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    (getCacheFlushListern): Renamed as getFlushRequester
M  src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    Added synchronizations on this.storefiles.
    (activeScanners, newScannerLock): Removed.  We no longer try to
    block out scanners when compacting (Turns out same functionality
    for blocking scanners is up in HRegion used around closing/split).
    (changedReadersObservers): Added.
    (updateReaders): New method that manages the insertion of new
    reader on flush.  Also calls new notifyChangedReadersObservers.
    (notifyChagnedReadersObservers, addChangedReadersObserver,
      deleteChangedReadersObservers): Added.
    (completeCompaction): Previous deleting old store files, we'd
    remove from this.storefiles and delete all in one step; now we
    do the remove first, notify all observers of readers, and then
    do the delete so observers have a chance to clean up any old
    references to files about to be deleted.  Removed all the lockout
    of new scanner creation and wait on old scanners to come int.
    (updateActiveScanners): Removed.
    (getStorefiles): Accessor.  Added.
M  src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    Added implementation of new ChangedReadersObserver interface.
    Added a lock that we hold when 'nexting' and when changing the
    set of readers out from under the scanner.
    Changed the constructor moving bulk into new openReaders method
    that we reuse when list of Readers changes.
    (next): Moved head of this method into new getNextViableRow (used
    to be called chosenRow and chosenTimestamp).  New method returns
    simple datastructure of row and timestamp (ViableRow).
    (close): On close, remove ourselves as ChangedReadersObserver (we
    added ourselves in the constructor).
    (updateReaders): Changed the set of Readers out from under the
    Scanner.
A  src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
    Added.
M  src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    Changed name of the interface we implement from CacheFlushListener to
    FlushRequester.
D  src/java/org/apache/hadoop/hbase/regionserver/CacheFlushListener.java
    Renamed as FlushRequester.
M  src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
    Remove update of activeScanners.
A  src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
    Added.  Rename of CacheFlushListener.
M  src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    Renamed method getCacheFlushListener as getFlushRequester.
M src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java
    Formatting.
M  src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    CacheFlushListener was renamed as FlushListener.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@651017 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-04-23 19:21:26 +00:00
parent 3607d3e0db
commit e1da900df8
10 changed files with 464 additions and 328 deletions

View File

@ -0,0 +1,34 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
/**
* If set of MapFile.Readers in Store change, implementors are notified.
*/
public interface ChangedReadersObserver {
/**
* Notify observers.
*/
void updateReaders() throws IOException;
}

View File

@ -22,15 +22,15 @@ package org.apache.hadoop.hbase.regionserver;
/**
* Implementors of this interface want to be notified when an HRegion
* determines that a cache flush is needed. A CacheFlushListener (or null)
* must be passed to the HRegion constructor.
* determines that a cache flush is needed. A FlushRequester (or null)
* must be passed to the HRegion constructor so it knows who to call when it
* has a filled memcache.
*/
public interface CacheFlushListener {
public interface FlushRequester {
/**
* Tell the listener the cache needs to be flushed.
*
* @param region the HRegion requesting the cache flush
*/
void flushRequested(HRegion region);
}
void request(HRegion region);
}

View File

@ -38,8 +38,11 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** Flush cache upon request */
class Flusher extends Thread implements CacheFlushListener {
/**
* Thread that flushes cache on request
* @see FlushRequester
*/
class Flusher extends Thread implements FlushRequester {
static final Log LOG = LogFactory.getLog(Flusher.class);
private final BlockingQueue<HRegion> flushQueue =
new LinkedBlockingQueue<HRegion>();
@ -110,13 +113,8 @@ class Flusher extends Thread implements CacheFlushListener {
}
/** {@inheritDoc} */
public void flushRequested(HRegion r) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
regionsInQueue.add(r);
flushQueue.add(r);
}
}
public void request(HRegion r) {
addRegion(r, System.currentTimeMillis());
}
/**
@ -204,18 +202,41 @@ class Flusher extends Thread implements CacheFlushListener {
// Queue up regions for optional flush if they need it
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion region: regions) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(region) &&
(now - optionalFlushPeriod) > region.getLastFlushTime()) {
regionsInQueue.add(region);
flushQueue.add(region);
region.setLastFlushTime(now);
}
}
optionallyAddRegion(region, now);
}
}
}
/*
* Add region if not already added and if optional flush period has been
* exceeded.
* @param r Region to add.
* @param now The 'now' to use. Set last flush time to this value.
*/
private void optionallyAddRegion(final HRegion r, final long now) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r) &&
(now - optionalFlushPeriod) > r.getLastFlushTime()) {
addRegion(r, now);
}
}
}
/*
* Add region if not already added.
* @param r Region to add.
* @param now The 'now' to use. Set last flush time to this value.
*/
private void addRegion(final HRegion r, final long now) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
regionsInQueue.add(r);
flushQueue.add(r);
r.setLastFlushTime(now);
}
}
}
/**
* Check if the regionserver's memcache memory usage is greater than the
* limit. If so, flush regions with the biggest memcaches until we're down

View File

@ -107,10 +107,15 @@ public abstract class HAbstractScanner implements InternalScanner {
}
}
protected TreeMap<Text, Vector<ColumnMatcher>> okCols; // Holds matchers for each column family
// Holds matchers for each column family
protected TreeMap<Text, Vector<ColumnMatcher>> okCols;
// True when scanning is done
protected volatile boolean scannerClosed = false;
// The timestamp to match entries against
protected long timestamp;
protected boolean scannerClosed = false; // True when scanning is done
protected long timestamp; // The timestamp to match entries against
private boolean wildcardMatch;
private boolean multipleMatchers;

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.WrongRegionException;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
@ -341,18 +342,22 @@ public class HRegion implements HConstants {
volatile boolean writesEnabled = true;
}
volatile WriteState writestate = new WriteState();
private volatile WriteState writestate = new WriteState();
final int memcacheFlushSize;
private volatile long lastFlushTime;
final CacheFlushListener flushListener;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0);
final FlushRequester flushListener;
private final int blockingMemcacheSize;
final long threadWakeFrequency;
// Used to guard splits and closes
private final ReentrantReadWriteLock splitsAndClosesLock =
new ReentrantReadWriteLock();
// Stop updates lock
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
private final Integer splitLock = new Integer(0);
private final long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
private final AtomicInteger activeScannerCount = new AtomicInteger(0);
//////////////////////////////////////////////////////////////////////////////
// Constructor
@ -381,7 +386,7 @@ public class HRegion implements HConstants {
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles,
CacheFlushListener flushListener) throws IOException {
FlushRequester flushListener) throws IOException {
this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
}
@ -410,7 +415,7 @@ public class HRegion implements HConstants {
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles,
CacheFlushListener flushListener, final Progressable reporter)
FlushRequester flushListener, final Progressable reporter)
throws IOException {
this.basedir = basedir;
@ -566,20 +571,17 @@ public class HRegion implements HConstants {
}
}
}
lock.writeLock().lock();
LOG.debug("new updates and scanners for region " + regionName +
" disabled");
splitsAndClosesLock.writeLock().lock();
LOG.debug("Updates and scanners for region " + regionName + " disabled");
try {
// Wait for active scanners to finish. The write lock we hold will prevent
// new scanners from being created.
// Wait for active scanners to finish. The write lock we hold will
// prevent new scanners from being created.
synchronized (activeScannerCount) {
while (activeScannerCount.get() != 0) {
LOG.debug("waiting for " + activeScannerCount.get() +
" scanners to finish");
try {
activeScannerCount.wait();
} catch (InterruptedException e) {
// continue
}
@ -620,7 +622,7 @@ public class HRegion implements HConstants {
LOG.info("closed " + this.regionInfo.getRegionName());
return result;
} finally {
lock.writeLock().unlock();
splitsAndClosesLock.writeLock().unlock();
}
}
}
@ -868,10 +870,8 @@ public class HRegion implements HConstants {
}
}
doRegionCompactionCleanup();
LOG.info("compaction completed on region " + getRegionName() +
". Took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
LOG.info("compaction completed on region " + getRegionName() + " in " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
} finally {
synchronized (writestate) {
writestate.compacting = false;
@ -919,11 +919,12 @@ public class HRegion implements HConstants {
}
}
try {
lock.readLock().lock(); // Prevent splits and closes
// Prevent splits and closes
splitsAndClosesLock.readLock().lock();
try {
return internalFlushcache();
} finally {
lock.readLock().unlock();
splitsAndClosesLock.readLock().unlock();
}
} finally {
synchronized (writestate) {
@ -984,10 +985,13 @@ public class HRegion implements HConstants {
// to do this for a moment. Its quick. The subsequent sequence id that
// goes into the HLog after we've flushed all these snapshots also goes
// into the info file that sits beside the flushed files.
synchronized (updateLock) {
this.updatesLock.writeLock().lock();
try {
for (HStore s: stores.values()) {
s.snapshot();
}
} finally {
this.updatesLock.writeLock().unlock();
}
long sequenceId = log.startCacheFlush();
@ -1150,7 +1154,7 @@ public class HRegion implements HConstants {
HStoreKey key = null;
checkRow(row);
lock.readLock().lock();
splitsAndClosesLock.readLock().lock();
try {
// examine each column family for the preceeding or matching key
for(Text colFamily : stores.keySet()){
@ -1188,7 +1192,7 @@ public class HRegion implements HConstants {
return new RowResult(key.getRow(), cellsWritten);
} finally {
lock.readLock().unlock();
splitsAndClosesLock.readLock().unlock();
}
}
@ -1235,7 +1239,7 @@ public class HRegion implements HConstants {
public InternalScanner getScanner(Text[] cols, Text firstRow,
long timestamp, RowFilterInterface filter)
throws IOException {
lock.readLock().lock();
splitsAndClosesLock.readLock().lock();
try {
if (this.closed.get()) {
throw new IOException("Region " + this.getRegionName().toString() +
@ -1257,7 +1261,7 @@ public class HRegion implements HConstants {
return new HScanner(cols, firstRow, timestamp,
storelist.toArray(new HStore [storelist.size()]), filter);
} finally {
lock.readLock().unlock();
splitsAndClosesLock.readLock().unlock();
}
}
@ -1506,15 +1510,15 @@ public class HRegion implements HConstants {
* @throws IOException
*/
private void update(final TreeMap<HStoreKey, byte []> updatesByColumn)
throws IOException {
throws IOException {
if (updatesByColumn == null || updatesByColumn.size() <= 0) {
return;
}
synchronized (updateLock) { // prevent a cache flush
boolean flush = false;
this.updatesLock.readLock().lock();
try {
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), updatesByColumn);
regionInfo.getTableDesc().getName(), updatesByColumn);
long size = 0;
for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
@ -1522,12 +1526,15 @@ public class HRegion implements HConstants {
size = this.memcacheSize.addAndGet(getEntrySize(key, val));
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
}
if (this.flushListener != null && !this.flushRequested &&
size > this.memcacheFlushSize) {
// Request a cache flush
this.flushListener.flushRequested(this);
this.flushRequested = true;
}
flush = this.flushListener != null && !this.flushRequested &&
size > this.memcacheFlushSize;
} finally {
this.updatesLock.readLock().unlock();
}
if (flush) {
// Request a cache flush. Do it outside update lock.
this.flushListener.request(this);
this.flushRequested = true;
}
}
@ -1597,11 +1604,11 @@ public class HRegion implements HConstants {
*/
long obtainRowLock(Text row) throws IOException {
checkRow(row);
lock.readLock().lock();
splitsAndClosesLock.readLock().lock();
try {
if (this.closed.get()) {
throw new IOException("Region " + this.getRegionName().toString() +
" closed");
throw new NotServingRegionException("Region " +
this.getRegionName().toString() + " closed");
}
synchronized (rowsToLocks) {
while (rowsToLocks.get(row) != null) {
@ -1618,7 +1625,7 @@ public class HRegion implements HConstants {
return lid.longValue();
}
} finally {
lock.readLock().unlock();
splitsAndClosesLock.readLock().unlock();
}
}

View File

@ -1264,8 +1264,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return this.requestCount;
}
/** @return reference to CacheFlushListener */
public CacheFlushListener getCacheFlushListener() {
/** @return reference to FlushRequester */
public FlushRequester getFlushRequester() {
return this.cacheFlusher;
}

View File

@ -24,12 +24,12 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -41,30 +41,29 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.BloomFilterDescriptor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TextSequence;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.BloomFilterDescriptor;
import org.onelab.filter.BloomFilter;
import org.onelab.filter.CountingBloomFilter;
import org.onelab.filter.Filter;
import org.onelab.filter.RetouchedBloomFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* HStore maintains a bunch of data files. It is responsible for maintaining
* the memory/file hierarchy and for periodic flushes to disk and compacting
@ -102,7 +101,6 @@ public class HStore implements HConstants {
private final Integer flushLock = new Integer(0);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final AtomicInteger activeScanners = new AtomicInteger(0);
final Text storeName;
@ -110,7 +108,7 @@ public class HStore implements HConstants {
* Sorted Map of readers keyed by sequence id (Most recent should be last in
* in list).
*/
final SortedMap<Long, HStoreFile> storefiles =
private final SortedMap<Long, HStoreFile> storefiles =
Collections.synchronizedSortedMap(new TreeMap<Long, HStoreFile>());
/*
@ -128,9 +126,8 @@ public class HStore implements HConstants {
private final Path compactionDir;
private final Integer compactLock = new Integer(0);
private final int compactionThreshold;
private final ReentrantReadWriteLock newScannerLock =
new ReentrantReadWriteLock();
private final Set<ChangedReadersObserver> changedReaderObservers =
Collections.synchronizedSet(new HashSet<ChangedReadersObserver>());
/**
* An HStore is a set of zero or more MapFiles, which stretch backwards over
@ -563,7 +560,9 @@ public class HStore implements HConstants {
for (MapFile.Reader reader: this.readers.values()) {
reader.close();
}
result = new ArrayList<HStoreFile>(storefiles.values());
synchronized (this.storefiles) {
result = new ArrayList<HStoreFile>(storefiles.values());
}
LOG.debug("closed " + this.storeName);
return result;
} finally {
@ -659,26 +658,68 @@ public class HStore implements HConstants {
}
// D. Finally, make the new MapFile available.
this.lock.writeLock().lock();
try {
Long flushid = Long.valueOf(logCacheFlushId);
// Open the map file reader.
this.readers.put(flushid,
flushedFile.getReader(this.fs, this.bloomFilter));
this.storefiles.put(flushid, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
" with " + entries +
" entries, sequence id " + logCacheFlushId + ", data size " +
StringUtils.humanReadableInt(flushed) + ", file size " +
StringUtils.humanReadableInt(newStoreSize));
}
} finally {
this.lock.writeLock().unlock();
updateReaders(logCacheFlushId, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
" with " + entries +
" entries, sequence id " + logCacheFlushId + ", data size " +
StringUtils.humanReadableInt(flushed) + ", file size " +
StringUtils.humanReadableInt(newStoreSize));
}
}
return flushed;
}
/*
* Change readers adding into place the Reader produced by this new flush.
* @param logCacheFlushId
* @param flushedFile
* @throws IOException
*/
private void updateReaders(final long logCacheFlushId,
final HStoreFile flushedFile)
throws IOException {
this.lock.writeLock().lock();
try {
Long flushid = Long.valueOf(logCacheFlushId);
// Open the map file reader.
this.readers.put(flushid,
flushedFile.getReader(this.fs, this.bloomFilter));
this.storefiles.put(flushid, flushedFile);
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
} finally {
this.lock.writeLock().unlock();
}
}
/*
* Notify all observers that set of Readers has changed.
* @throws IOException
*/
private void notifyChangedReadersObservers() throws IOException {
synchronized (this.changedReaderObservers) {
for (ChangedReadersObserver o: this.changedReaderObservers) {
o.updateReaders();
}
}
}
/*
* @param o Observer who wants to know about changes in set of Readers
*/
void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o);
}
/*
* @param o Observer no longer interested in changes in set of Readers.
*/
void deleteChangedReaderObserver(ChangedReadersObserver o) {
if (!this.changedReaderObservers.remove(o)) {
LOG.warn("Not in set" + o);
}
}
//////////////////////////////////////////////////////////////////////////////
// Compaction
@ -724,12 +765,6 @@ public class HStore implements HConstants {
return checkSplit();
}
if (LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + filesToCompact.size() +
" files " + filesToCompact.toString() + " into " +
compactionDir.toUri().getPath());
}
// Storefiles are keyed by sequence id. The oldest file comes first.
// We need to return out of here a List that has the newest file first.
Collections.reverse(filesToCompact);
@ -737,15 +772,20 @@ public class HStore implements HConstants {
// The max-sequenceID in any of the to-be-compacted TreeMaps is the
// last key of storefiles.
maxId = this.storefiles.lastKey();
maxId = this.storefiles.lastKey().longValue();
}
// Step through them, writing to the brand-new MapFile
HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
this.compactionDir, info.getEncodedName(), family.getFamilyName(),
-1L, null);
if (LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + filesToCompact.size() +
" files " + filesToCompact.toString() + " into " +
FSUtils.getPath(compactedOutputFile.getMapFilePath()));
}
MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
this.compression, this.bloomFilter);
this.compression, this.bloomFilter);
try {
compactHStoreFiles(compactedOut, filesToCompact);
} finally {
@ -955,112 +995,92 @@ public class HStore implements HConstants {
*
* <p>Moving the compacted TreeMap into place means:
* <pre>
* 1) Wait for active scanners to exit
* 2) Acquiring the write-lock
* 3) Moving the new compacted MapFile into place
* 4) Unloading all the replaced MapFiles and close.
* 5) Deleting all the replaced MapFile files.
* 6) Loading the new TreeMap.
* 7) Compute new store size
* 8) Releasing the write-lock
* 9) Allow new scanners to proceed.
* 1) Moving the new compacted MapFile into place
* 2) Unload all replaced MapFiles, close and collect list to delete.
* 3) Loading the new TreeMap.
* 4) Compute new store size
* </pre>
*
* @param compactedFiles list of files that were compacted
* @param compactedFile HStoreFile that is the result of the compaction
* @throws IOException
*/
private void completeCompaction(List<HStoreFile> compactedFiles,
HStoreFile compactedFile) throws IOException {
// 1. Wait for active scanners to exit
newScannerLock.writeLock().lock(); // prevent new scanners
private void completeCompaction(final List<HStoreFile> compactedFiles,
final HStoreFile compactedFile)
throws IOException {
this.lock.writeLock().lock();
try {
synchronized (activeScanners) {
while (activeScanners.get() != 0) {
try {
activeScanners.wait();
} catch (InterruptedException e) {
// continue
}
}
// 2. Acquiring the HStore write-lock
this.lock.writeLock().lock();
// 1. Moving the new MapFile into place.
HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
info.getEncodedName(), family.getFamilyName(), -1, null);
if (LOG.isDebugEnabled()) {
LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
" to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
}
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
LOG.error("Failed move of compacted file " +
finalCompactedFile.getMapFilePath().toString());
return;
}
try {
// 3. Moving the new MapFile into place.
HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
info.getEncodedName(), family.getFamilyName(), -1, null);
if(LOG.isDebugEnabled()) {
LOG.debug("moving " +
FSUtils.getPath(compactedFile.getMapFilePath()) +
" to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
}
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
LOG.error("Failed move of compacted file " +
finalCompactedFile.getMapFilePath().toString());
return;
// 2. Unload all replaced MapFiles, close and collect list to delete.
synchronized (storefiles) {
Map<Long, HStoreFile> toDelete = new HashMap<Long, HStoreFile>();
for (Map.Entry<Long, HStoreFile> e : this.storefiles.entrySet()) {
if (!compactedFiles.contains(e.getValue())) {
continue;
}
Long key = e.getKey();
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
toDelete.put(key, e.getValue());
}
// 4. and 5. Unload all the replaced MapFiles, close and delete.
synchronized (storefiles) {
List<Long> toDelete = new ArrayList<Long>();
for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
if (!compactedFiles.contains(e.getValue())) {
continue;
}
Long key = e.getKey();
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
toDelete.add(key);
try {
// 3. Loading the new TreeMap.
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
for (Long key : toDelete.keySet()) {
this.storefiles.remove(key);
}
try {
for (Long key: toDelete) {
HStoreFile hsf = this.storefiles.remove(key);
hsf.delete();
}
// 6. Loading the new TreeMap.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal,
// Use a block cache (if configured) for this reader since
// it is the only one.
finalCompactedFile.getReader(this.fs, this.bloomFilter,
family.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files for " + this.storeName +
". Compacted file is " + finalCompactedFile.toString() +
". Files replaced are " + compactedFiles.toString() +
" some of which may have been already removed", e);
}
// 7. Compute new store size
storeSize = 0L;
for (HStoreFile hsf: storefiles.values()) {
storeSize += hsf.length();
// Add new compacted Reader and store file.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal,
// Use a block cache (if configured) for this reader since
// it is the only one.
finalCompactedFile.getReader(this.fs, this.bloomFilter, family
.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile);
// Tell observers that list of Readers has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
for (HStoreFile hsf : toDelete.values()) {
hsf.delete();
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files for " + this.storeName +
". Compacted file is " + finalCompactedFile.toString() +
". Files replaced are " + compactedFiles.toString() +
" some of which may have been already removed", e);
}
// 4. Compute new store size
storeSize = 0L;
for (HStoreFile hsf : storefiles.values()) {
storeSize += hsf.length();
}
} finally {
// 8. Releasing the write-lock
this.lock.writeLock().unlock();
}
} finally {
// 9. Allow new scanners to proceed.
newScannerLock.writeLock().unlock();
this.lock.writeLock().unlock();
}
}
//////////////////////////////////////////////////////////////////////////////
// Accessors.
// ////////////////////////////////////////////////////////////////////////////
// Accessors.
// (This is the only section that is directly useful!)
//////////////////////////////////////////////////////////////////////////////
@ -1635,20 +1655,13 @@ public class HStore implements HConstants {
* Return a scanner for both the memcache and the HStore files
*/
InternalScanner getScanner(long timestamp, Text targetCols[],
Text firstRow, RowFilterInterface filter) throws IOException {
newScannerLock.readLock().lock(); // ability to create a new
// scanner during a compaction
Text firstRow, RowFilterInterface filter)
throws IOException {
lock.readLock().lock();
try {
lock.readLock().lock(); // lock HStore
try {
return new HStoreScanner(this, targetCols, firstRow, timestamp, filter);
} finally {
lock.readLock().unlock();
}
return new HStoreScanner(this, targetCols, firstRow, timestamp, filter);
} finally {
newScannerLock.readLock().unlock();
lock.readLock().unlock();
}
}
@ -1689,20 +1702,15 @@ public class HStore implements HConstants {
}
return m.groupCount() > 1 && m.group(2) != null;
}
protected void updateActiveScanners() {
synchronized (activeScanners) {
int numberOfScanners = activeScanners.decrementAndGet();
if (numberOfScanners < 0) {
LOG.error(storeName +
" number of active scanners less than zero: " +
numberOfScanners + " resetting to zero");
activeScanners.set(0);
numberOfScanners = 0;
}
if (numberOfScanners == 0) {
activeScanners.notifyAll();
}
/**
* @return Current list of store files.
*/
SortedMap<Long, HStoreFile> getStorefiles() {
synchronized (this.storefiles) {
SortedMap<Long, HStoreFile> copy =
new TreeMap<Long, HStoreFile>(this.storefiles);
return copy;
}
}
}
}

View File

@ -66,7 +66,6 @@ class HStoreScanner implements InternalScanner {
try {
scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
for (int i = 0; i < scanners.length; i++) {
if (scanners[i].isWildcardScanner()) {
this.wildcardMatch = true;
@ -75,7 +74,6 @@ class HStoreScanner implements InternalScanner {
this.multipleMatchers = true;
}
}
} catch(IOException e) {
for (int i = 0; i < this.scanners.length; i++) {
if(scanners[i] != null) {
@ -87,7 +85,6 @@ class HStoreScanner implements InternalScanner {
// Advance to the first key in each scanner.
// All results will match the required column-set and scanTime.
for (int i = 0; i < scanners.length; i++) {
keys[i] = new HStoreKey();
resultSets[i] = new TreeMap<Text, byte []>();
@ -95,9 +92,6 @@ class HStoreScanner implements InternalScanner {
closeScanner(i);
}
}
// As we have now successfully completed initialization, increment the
// activeScanner count.
store.activeScanners.incrementAndGet();
}
/** @return true if the scanner is a wild card scanner */
@ -265,18 +259,13 @@ class HStoreScanner implements InternalScanner {
/** {@inheritDoc} */
public void close() {
try {
for(int i = 0; i < scanners.length; i++) {
if(scanners[i] != null) {
closeScanner(i);
}
}
} finally {
store.updateActiveScanners();
}
}
/** {@inheritDoc} */
public Iterator<Map.Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
throw new UnsupportedOperationException("Unimplemented serverside. " +
"next(HStoreKey, StortedMap(...) is more efficient");

View File

@ -22,56 +22,40 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.SortedMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
/**
* A scanner that iterates through HStore files
*/
class StoreFileScanner extends HAbstractScanner {
class StoreFileScanner extends HAbstractScanner
implements ChangedReadersObserver {
// Keys retrieved from the sources
private HStoreKey keys[];
// Values that correspond to those keys
private byte [][] vals;
// Readers we go against.
private MapFile.Reader[] readers;
private HStore store;
// Store this scanner came out of.
private final HStore store;
// Used around replacement of Readers if they change while we're scanning.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public StoreFileScanner(final HStore store, final long timestamp,
final Text[] targetCols, final Text firstRow)
throws IOException {
super(timestamp, targetCols);
this.store = store;
this.store.addChangedReaderObserver(this);
try {
this.readers = new MapFile.Reader[store.storefiles.size()];
// Most recent map file should be first
int i = readers.length - 1;
for(HStoreFile curHSF: store.storefiles.values()) {
readers[i--] = curHSF.getReader(store.fs, store.bloomFilter);
}
this.keys = new HStoreKey[readers.length];
this.vals = new byte[readers.length][];
// Advance the readers to the first pos.
for(i = 0; i < readers.length; i++) {
keys[i] = new HStoreKey();
if(firstRow.getLength() != 0) {
if(findFirstRow(i, firstRow)) {
continue;
}
}
while(getNext(i)) {
if(columnMatch(i)) {
break;
}
}
}
openReaders(firstRow);
} catch (Exception ex) {
close();
IOException e = new IOException("HStoreScanner failed construction");
@ -80,6 +64,46 @@ class StoreFileScanner extends HAbstractScanner {
}
}
/*
* Go open new Reader iterators and cue them at <code>firstRow</code>.
* Closes existing Readers if any.
* @param firstRow
* @throws IOException
*/
private void openReaders(final Text firstRow) throws IOException {
if (this.readers != null) {
for (int i = 0; i < this.readers.length; i++) {
this.readers[i].close();
}
}
// Open our own copies of the Readers here inside in the scanner.
this.readers = new MapFile.Reader[this.store.getStorefiles().size()];
// Most recent map file should be first
int i = readers.length - 1;
for(HStoreFile curHSF: store.getStorefiles().values()) {
readers[i--] = curHSF.getReader(store.fs, store.bloomFilter);
}
this.keys = new HStoreKey[readers.length];
this.vals = new byte[readers.length][];
// Advance the readers to the first pos.
for (i = 0; i < readers.length; i++) {
keys[i] = new HStoreKey();
if (firstRow.getLength() != 0) {
if (findFirstRow(i, firstRow)) {
continue;
}
}
while (getNext(i)) {
if (columnMatch(i)) {
break;
}
}
}
}
/**
* For a particular column i, find all the matchers defined for the column.
* Compare the column family and column key using the matchers. The first one
@ -107,72 +131,104 @@ class StoreFileScanner extends HAbstractScanner {
@Override
public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
throws IOException {
if (scannerClosed) {
if (this.scannerClosed) {
return false;
}
// Find the next row label (and timestamp)
Text chosenRow = null;
long chosenTimestamp = -1;
this.lock.readLock().lock();
try {
// Find the next viable row label (and timestamp).
ViableRow viableRow = getNextViableRow();
// Grab all the values that match this row/timestamp
boolean insertedItem = false;
if (viableRow.getRow() != null) {
key.setRow(viableRow.getRow());
key.setVersion(viableRow.getTimestamp());
key.setColumn(new Text(""));
for (int i = 0; i < keys.length; i++) {
// Fetch the data
while ((keys[i] != null)
&& (keys[i].getRow().compareTo(viableRow.getRow()) == 0)) {
// If we are doing a wild card match or there are multiple matchers
// per column, we need to scan all the older versions of this row
// to pick up the rest of the family members
if(!isWildcardScanner()
&& !isMultipleMatchScanner()
&& (keys[i].getTimestamp() != viableRow.getTimestamp())) {
break;
}
if(columnMatch(i)) {
// We only want the first result for any specific family member
if(!results.containsKey(keys[i].getColumn())) {
results.put(new Text(keys[i].getColumn()), vals[i]);
insertedItem = true;
}
}
if (!getNext(i)) {
closeSubScanner(i);
}
}
// Advance the current scanner beyond the chosen row, to
// a valid timestamp, so we're ready next time.
while ((keys[i] != null)
&& ((keys[i].getRow().compareTo(viableRow.getRow()) <= 0)
|| (keys[i].getTimestamp() > this.timestamp)
|| (! columnMatch(i)))) {
getNext(i);
}
}
}
return insertedItem;
} finally {
this.lock.readLock().unlock();
}
}
// Data stucture to hold next, viable row (and timestamp).
class ViableRow {
private final Text row;
private final long ts;
ViableRow(final Text r, final long t) {
this.row = r;
this.ts = t;
}
public Text getRow() {
return this.row;
}
public long getTimestamp() {
return this.ts;
}
}
/*
* @return An instance of <code>ViableRow</code>
* @throws IOException
*/
private ViableRow getNextViableRow() throws IOException {
// Find the next viable row label (and timestamp).
Text viableRow = null;
long viableTimestamp = -1;
for(int i = 0; i < keys.length; i++) {
if((keys[i] != null)
&& (columnMatch(i))
&& (keys[i].getTimestamp() <= this.timestamp)
&& ((chosenRow == null)
|| (keys[i].getRow().compareTo(chosenRow) < 0)
|| ((keys[i].getRow().compareTo(chosenRow) == 0)
&& (keys[i].getTimestamp() > chosenTimestamp)))) {
chosenRow = new Text(keys[i].getRow());
chosenTimestamp = keys[i].getTimestamp();
&& ((viableRow == null)
|| (keys[i].getRow().compareTo(viableRow) < 0)
|| ((keys[i].getRow().compareTo(viableRow) == 0)
&& (keys[i].getTimestamp() > viableTimestamp)))) {
viableRow = new Text(keys[i].getRow());
viableTimestamp = keys[i].getTimestamp();
}
}
// Grab all the values that match this row/timestamp
boolean insertedItem = false;
if(chosenRow != null) {
key.setRow(chosenRow);
key.setVersion(chosenTimestamp);
key.setColumn(new Text(""));
for(int i = 0; i < keys.length; i++) {
// Fetch the data
while((keys[i] != null)
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
// If we are doing a wild card match or there are multiple matchers
// per column, we need to scan all the older versions of this row
// to pick up the rest of the family members
if(!isWildcardScanner()
&& !isMultipleMatchScanner()
&& (keys[i].getTimestamp() != chosenTimestamp)) {
break;
}
if(columnMatch(i)) {
// We only want the first result for any specific family member
if(!results.containsKey(keys[i].getColumn())) {
results.put(new Text(keys[i].getColumn()), vals[i]);
insertedItem = true;
}
}
if(!getNext(i)) {
closeSubScanner(i);
}
}
// Advance the current scanner beyond the chosen row, to
// a valid timestamp, so we're ready next time.
while((keys[i] != null)
&& ((keys[i].getRow().compareTo(chosenRow) <= 0)
|| (keys[i].getTimestamp() > this.timestamp)
|| (! columnMatch(i)))) {
getNext(i);
}
}
}
return insertedItem;
return new ViableRow(viableRow, viableTimestamp);
}
/**
@ -242,7 +298,8 @@ class StoreFileScanner extends HAbstractScanner {
/** Shut it down! */
public void close() {
if(! scannerClosed) {
if (!this.scannerClosed) {
this.store.deleteChangedReaderObserver(this);
try {
for(int i = 0; i < readers.length; i++) {
if(readers[i] != null) {
@ -255,8 +312,23 @@ class StoreFileScanner extends HAbstractScanner {
}
} finally {
scannerClosed = true;
this.scannerClosed = true;
}
}
}
// Implementation of ChangedReadersObserver
public void updateReaders() throws IOException {
this.lock.writeLock().lock();
try {
// The keys are currently lined up at the next row to fetch. Pass in
// the current row as 'first' row and readers will be opened and cue'd
// up so future call to next will start here.
ViableRow viableRow = getNextViableRow();
openReaders(viableRow.getRow());
LOG.debug("Replaced Scanner Readers at row " + viableRow.getRow());
} finally {
this.lock.writeLock().unlock();
}
}
}

View File

@ -127,7 +127,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
assertNotNull(r);
// Flush the cache
server.getCacheFlushListener().flushRequested(r);
server.getFlushRequester().request(r);
// Now, wait until split makes it into the meta table.
int oldCount = count;