The core elements of HBASE-2037: refactoring flushing, and adding configurability in which HRegion subclass is instantiated

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@944527 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-05-15 00:16:40 +00:00
parent 8e95593f42
commit e593f0efbf
10 changed files with 646 additions and 86 deletions

View File

@ -1,5 +1,5 @@
/**
* Copyright 2007 The Apache Software Foundation
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -286,6 +286,8 @@ public interface HConstants {
*/
public static int RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };
public static final String REGION_IMPL = "hbase.hregion.impl";
/** modifyTable op for replacing the table descriptor */
public static enum Modify {
CLOSE_REGION,

View File

@ -19,12 +19,6 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -44,6 +38,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
/**
* A non-instantiable class that has a static method capable of compacting
* a table by merging adjacent regions.
@ -152,12 +152,12 @@ class HMerge implements HConstants {
for (int i = 0; i < info.length - 1; i++) {
if (currentRegion == null) {
currentRegion =
new HRegion(tabledir, hlog, fs, conf, info[i], null);
HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null);
currentRegion.initialize(null, null);
currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion =
new HRegion(tabledir, hlog, fs, conf, info[i + 1], null);
HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null);
nextRegion.initialize(null, null);
nextSize = nextRegion.getLargestHStoreSize();
@ -326,7 +326,7 @@ class HMerge implements HConstants {
// Scan root region to find all the meta regions
root = new HRegion(rootTableDir, hlog, fs, conf,
root = HRegion.newHRegion(rootTableDir, hlog, fs, conf,
HRegionInfo.ROOT_REGIONINFO, null);
root.initialize(null, null);

View File

@ -19,22 +19,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -70,6 +54,23 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions.
@ -238,7 +239,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
/**
* HRegion constructor.
* HRegion constructor. his constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
* {@link org.apache.hadoop.hbase.regionserver.HRegion#newHRegion( org.apache.hadoop.fs.Path, HLog, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.hbase.HBaseConfiguration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)} method.
*
*
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
@ -256,6 +260,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @param flushListener an object that implements CacheFlushListener or null
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
*
* @see org.apache.hadoop.hbase.regionserver.HRegion#newHRegion(org.apache.hadoop.fs.Path, HLog, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.hbase.HBaseConfiguration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)
*/
public HRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener) {
@ -686,10 +693,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// Create a region instance and then move the splits into place under
// regionA and regionB.
HRegion regionA =
new HRegion(basedir, log, fs, conf, regionAInfo, null);
HRegion.newHRegion(basedir, log, fs, conf, regionAInfo, null);
moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
HRegion regionB =
new HRegion(basedir, log, fs, conf, regionBInfo, null);
HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
HRegion regions[] = new HRegion [] {regionA, regionB};
@ -927,7 +934,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
private boolean internalFlushcache() throws IOException {
protected boolean internalFlushcache() throws IOException {
final long startTime = System.currentTimeMillis();
// Clear flush flag.
// Record latest flush time
@ -954,12 +961,19 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
this.updatesLock.writeLock().lock();
// Get current size of memstores.
final long currentMemStoreSize = this.memstoreSize.get();
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>();
try {
for (Store s: stores.values()) {
s.snapshot();
}
sequenceId = log.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
for (Store s : stores.values()) {
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
}
// prepare flush (take a snapshot)
for (StoreFlusher flusher : storeFlushers) {
flusher.prepare();
}
} finally {
this.updatesLock.writeLock().unlock();
}
@ -973,12 +987,25 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// A. Flush memstore to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
for (Store hstore: stores.values()) {
boolean needsCompaction = hstore.flushCache(completeSequenceId);
for (StoreFlusher flusher : storeFlushers) {
flusher.flushCache();
}
internalPreFlushcacheCommit();
/*
* Switch between memstore and the new store file(s).
*/
for (StoreFlusher flusher : storeFlushers) {
boolean needsCompaction = flusher.commit();
if (needsCompaction) {
compactionRequested = true;
}
}
storeFlushers.clear();
// Set down the memstore size by amount of flush.
this.memstoreSize.addAndGet(-currentMemStoreSize);
} catch (Throwable t) {
@ -1022,6 +1049,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return compactionRequested;
}
/**
* A hook for sub-classes wishing to perform operations prior to the
* cache flush commit stage.
*
* @throws IOException allow children to throw exception
*/
protected void internalPreFlushcacheCommit() throws IOException {
}
/**
* Get the sequence number to be associated with this cache flush. Used by
* TransactionalRegion to not complete pending transactions.
@ -1117,13 +1153,18 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
scan.addFamily(family);
}
}
return new RegionScanner(scan, additionalScanners);
return instantiateInternalScanner(scan, additionalScanners);
} finally {
newScannerLock.readLock().unlock();
}
}
protected InternalScanner instantiateInternalScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
return new RegionScanner(scan, additionalScanners);
}
//////////////////////////////////////////////////////////////////////////////
// set() methods for client use.
//////////////////////////////////////////////////////////////////////////////
@ -1976,6 +2017,45 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
// Utility methods
/**
* A utility method to create new instances of HRegion based on the
* {@link org.apache.hadoop.hbase.HConstants#REGION_IMPL} configuration
* property.
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
* The log file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* is new), then read them from the supplied path.
* @param flushListener an object that implements CacheFlushListener or null
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
* @return the new instance
*/
public static HRegion newHRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener) {
try {
@SuppressWarnings("unchecked")
Class<? extends HRegion> regionClass =
(Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
Constructor<? extends HRegion> c =
regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
Configuration.class, HRegionInfo.class, FlushRequester.class);
return c.newInstance(basedir, log, fs, conf, regionInfo, flushListener);
} catch (Throwable e) {
// todo: what should I throw here?
throw new IllegalStateException("Could not instantiate a region instance.", e);
}
}
/**
* Convenience method creating new HRegions. Used by createTable and by the
@ -1998,7 +2078,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName());
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
HRegion region = new HRegion(tableDir,
HRegion region = HRegion.newHRegion(tableDir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME),
new Path(regionDir, HREGION_OLDLOGDIR_NAME), conf, null),
fs, conf, info, null);
@ -2028,7 +2108,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
if (info == null) {
throw new NullPointerException("Passed region info is null");
}
HRegion r = new HRegion(
HRegion r = HRegion.newHRegion(
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
log, FileSystem.get(conf), conf, info, null);
r.initialize(null, null);
@ -2335,7 +2415,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null);
dstRegion.initialize(null, null);
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
@ -2592,9 +2672,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
// Currently expects tables have one region only.
if (p.getName().startsWith(rootStr)) {
region = new HRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null);
region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null);
} else if (p.getName().startsWith(metaStr)) {
region = new HRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO,
region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO,
null);
} else {
throw new IOException("Not a known catalog table: " + p.toString());

View File

@ -1520,7 +1520,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
HRegion r = HRegion.newHRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
.getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
this.cacheFlusher);
r.initialize(null, new Progressable() {

View File

@ -116,7 +116,7 @@ public class MemStore implements HeapSize {
/**
* Creates a snapshot of the current memstore.
* Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)}
* Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet<KeyValue>)}
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
@ -156,7 +156,7 @@ public class MemStore implements HeapSize {
* call to {@link #snapshot()}
* @return Return snapshot.
* @see {@link #snapshot()}
* @see {@link #clearSnapshot(java.util.Map)}
* @see {@link #clearSnapshot(SortedSet<KeyValue>)}
*/
KeyValueSkipListSet getSnapshot() {
return this.snapshot;
@ -168,7 +168,7 @@ public class MemStore implements HeapSize {
* @throws UnexpectedException
* @see {@link #snapshot()}
*/
void clearSnapshot(final KeyValueSkipListSet ss)
void clearSnapshot(final SortedSet<KeyValue> ss)
throws UnexpectedException {
this.lock.writeLock().lock();
try {

View File

@ -104,7 +104,7 @@ public class ScanDeleteTracker implements DeleteTracker {
@Override
public boolean isDeleted(byte [] buffer, int qualifierOffset,
int qualifierLength, long timestamp) {
if (timestamp < familyStamp) {
if (timestamp <= familyStamp) {
return true;
}

View File

@ -58,6 +58,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
@ -380,8 +381,11 @@ public class Store implements HConstants, HeapSize {
if (maxSeqIdInLog > -1) {
// We read some edits, so we should flush the memstore
this.snapshot();
boolean needCompaction = this.flushCache(maxSeqIdInLog);
StoreFlusher flusher = getStoreFlusher(maxSeqIdInLog);
flusher.prepare();
flusher.flushCache();
boolean needCompaction = flusher.commit();
if (needCompaction) {
this.compact(false);
}
@ -499,7 +503,7 @@ public class Store implements HConstants, HeapSize {
/**
* Snapshot this stores memstore. Call before running
* {@link #flushCache(long)} so it has some work to do.
* {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
*/
void snapshot() {
this.memstore.snapshot();
@ -512,21 +516,13 @@ public class Store implements HConstants, HeapSize {
* @return true if a compaction is needed
* @throws IOException
*/
boolean flushCache(final long logCacheFlushId) throws IOException {
// Get the snapshot to flush. Presumes that a call to
// this.memstore.snapshot() has happened earlier up in the chain.
KeyValueSkipListSet snapshot = this.memstore.getSnapshot();
private StoreFile flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
StoreFile sf = internalFlushCache(snapshot, logCacheFlushId);
if (sf == null) {
return false;
}
// Add new file to store files. Clear snapshot too while we have the
// Store write lock.
int size = updateStorefiles(logCacheFlushId, sf, snapshot);
return size >= this.compactionThreshold;
return internalFlushCache(snapshot, logCacheFlushId);
}
/*
@ -535,7 +531,7 @@ public class Store implements HConstants, HeapSize {
* @return StoreFile created.
* @throws IOException
*/
private StoreFile internalFlushCache(final KeyValueSkipListSet set,
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId)
throws IOException {
HFile.Writer writer = null;
@ -605,20 +601,18 @@ public class Store implements HConstants, HeapSize {
* @param sf
* @param set That was used to make the passed file <code>p</code>.
* @throws IOException
* @return Count of store files.
* @return Whether compaction is required.
*/
private int updateStorefiles(final long logCacheFlushId,
final StoreFile sf, final KeyValueSkipListSet set)
private boolean updateStorefiles(final long logCacheFlushId,
final StoreFile sf, final SortedSet<KeyValue> set)
throws IOException {
int count = 0;
this.lock.writeLock().lock();
try {
this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
count = this.storefiles.size();
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
this.memstore.clearSnapshot(set);
return count;
return this.storefiles.size() >= this.compactionThreshold;
} finally {
this.lock.writeLock().unlock();
}
@ -1513,6 +1507,42 @@ public class Store implements HConstants, HeapSize {
}
}
public StoreFlusher getStoreFlusher(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId);
}
private class StoreFlusherImpl implements StoreFlusher {
private long cacheFlushId;
private SortedSet<KeyValue> snapshot;
private StoreFile storeFile;
private StoreFlusherImpl(long cacheFlushId) {
this.cacheFlushId = cacheFlushId;
}
@Override
public void prepare() {
memstore.snapshot();
this.snapshot = memstore.getSnapshot();
}
@Override
public void flushCache() throws IOException {
storeFile = Store.this.flushCache(cacheFlushId, snapshot);
}
@Override
public boolean commit() throws IOException {
if (storeFile == null) {
return false;
}
// Add new file to store files. Clear snapshot too while we have
// the Store write lock.
return Store.this.updateStorefiles(cacheFlushId, storeFile, snapshot);
}
}
/**
* See if there's too much store files in this store
* @return true if number of store files is greater than

View File

@ -0,0 +1,63 @@
/*
* Copyright 2010 The Apache Software Foundation
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
/**
* A package protected interface for a store flushing.
* A store flusher carries the state required to prepare/flush/commit the
* store's cache.
*/
interface StoreFlusher {
/**
* Prepare for a store flush (create snapshot)
*
* Requires pausing writes.
*
* A very short operation.
*/
void prepare();
/**
* Flush the cache (create the new store file)
*
* A length operation which doesn't require locking out any function
* of the store.
*
* @throws IOException in case the flush fails
*/
void flushCache() throws IOException;
/**
* Commit the flush - add the store file to the store and clear the
* memstore snapshot.
*
* Requires pausing scans.
*
* A very short operation
*
* @return
* @throws IOException
*/
boolean commit() throws IOException;
}

View File

@ -1,5 +1,5 @@
/**
* Copyright 2007 The Apache Software Foundation
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -19,13 +19,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@ -41,13 +34,26 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
* Basic stand-alone testing of HRegion.
@ -64,13 +70,13 @@ public class TestHRegion extends HBaseTestCase {
private final int MAX_VERSIONS = 2;
// Test names
private final byte[] tableName = Bytes.toBytes("testtable");;
private final byte[] qual1 = Bytes.toBytes("qual1");
private final byte[] qual2 = Bytes.toBytes("qual2");
private final byte[] qual3 = Bytes.toBytes("qual3");
private final byte[] value1 = Bytes.toBytes("value1");
private final byte[] value2 = Bytes.toBytes("value2");
private final byte [] row = Bytes.toBytes("rowA");
protected final byte[] tableName = Bytes.toBytes("testtable");;
protected final byte[] qual1 = Bytes.toBytes("qual1");
protected final byte[] qual2 = Bytes.toBytes("qual2");
protected final byte[] qual3 = Bytes.toBytes("qual3");
protected final byte[] value1 = Bytes.toBytes("value1");
protected final byte[] value2 = Bytes.toBytes("value2");
protected final byte [] row = Bytes.toBytes("rowA");
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@ -1799,6 +1805,378 @@ public class TestHRegion extends HBaseTestCase {
}
}
/**
* Flushes the cache in a thread while scanning. The tests verify that the
* scan is coherent - e.g. the returned results are always of the same or
* later update as the previous results.
* @throws IOException scan / compact
* @throws InterruptedException thread join
*/
public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
byte[] tableName = Bytes.toBytes("testFlushCacheWhileScanning");
byte[] family = Bytes.toBytes("family");
int numRows = 1000;
int flushAndScanInterval = 10;
int compactInterval = 10 * flushAndScanInterval;
String method = "testFlushCacheWhileScanning";
initHRegion(tableName,method, family);
FlushThread flushThread = new FlushThread();
flushThread.start();
Scan scan = new Scan();
scan.addFamily(family);
scan.setFilter(new SingleColumnValueFilter(family, qual1,
CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L))));
int expectedCount = 0;
List<KeyValue> res = new ArrayList<KeyValue>();
boolean toggle=true;
for (long i = 0; i < numRows; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(family, qual1, Bytes.toBytes(i % 10));
region.put(put);
if (i != 0 && i % compactInterval == 0) {
//System.out.println("iteration = " + i);
region.compactStores(true);
}
if (i % 10 == 5L) {
expectedCount++;
}
if (i != 0 && i % flushAndScanInterval == 0) {
res.clear();
InternalScanner scanner = region.getScanner(scan);
if (toggle) {
flushThread.flush();
}
while (scanner.next(res)) ;
if (!toggle) {
flushThread.flush();
}
Assert.assertEquals("i=" + i, expectedCount, res.size());
toggle = !toggle;
}
}
flushThread.done();
flushThread.join();
flushThread.checkNoError();
}
protected class FlushThread extends Thread {
private volatile boolean done;
private Throwable error = null;
public void done() {
done = true;
synchronized (this) {
interrupt();
}
}
public void checkNoError() {
if (error != null) {
Assert.assertNull(error);
}
}
@Override
public void run() {
done = false;
while (!done) {
synchronized (this) {
try {
wait();
} catch (InterruptedException ignored) {
if (done) {
break;
}
}
}
try {
region.flushcache();
} catch (IOException e) {
if (!done) {
LOG.error("Error while flusing cache", e);
error = e;
}
break;
}
}
}
public void flush() {
synchronized (this) {
notify();
}
}
}
/**
* Writes very wide records and scans for the latest every time..
* Flushes and compacts the region every now and then to keep things
* realistic.
*
* @throws IOException by flush / scan / compaction
* @throws InterruptedException when joining threads
*/
public void testWritesWhileScanning()
throws IOException, InterruptedException {
byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
int testCount = 100;
int numRows = 1;
int numFamilies = 10;
int numQualifiers = 100;
int flushInterval = 7;
int compactInterval = 5 * flushInterval;
byte[][] families = new byte[numFamilies][];
for (int i = 0; i < numFamilies; i++) {
families[i] = Bytes.toBytes("family" + i);
}
byte[][] qualifiers = new byte[numQualifiers][];
for (int i = 0; i < numQualifiers; i++) {
qualifiers[i] = Bytes.toBytes("qual" + i);
}
String method = "testWritesWhileScanning";
initHRegion(tableName, method, families);
PutThread putThread = new PutThread(numRows, families, qualifiers);
putThread.start();
FlushThread flushThread = new FlushThread();
flushThread.start();
Scan scan = new Scan();
scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("row0"))));
int expectedCount = numFamilies * numQualifiers;
List<KeyValue> res = new ArrayList<KeyValue>();
long prevTimestamp = 0L;
for (int i = 0; i < testCount; i++) {
if (i != 0 && i % compactInterval == 0) {
region.compactStores(true);
}
if (i != 0 && i % flushInterval == 0) {
//System.out.println("scan iteration = " + i);
flushThread.flush();
}
boolean previousEmpty = res.isEmpty();
res.clear();
InternalScanner scanner = region.getScanner(scan);
while (scanner.next(res)) ;
if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
Assert.assertEquals("i=" + i, expectedCount, res.size());
long timestamp = res.get(0).getTimestamp();
Assert.assertTrue(timestamp >= prevTimestamp);
prevTimestamp = timestamp;
}
}
putThread.done();
putThread.join();
putThread.checkNoError();
flushThread.done();
flushThread.join();
flushThread.checkNoError();
}
protected class PutThread extends Thread {
private volatile boolean done;
private Throwable error = null;
private int numRows;
private byte[][] families;
private byte[][] qualifiers;
private PutThread(int numRows, byte[][] families,
byte[][] qualifiers) {
this.numRows = numRows;
this.families = families;
this.qualifiers = qualifiers;
}
public void done() {
done = true;
synchronized (this) {
interrupt();
}
}
public void checkNoError() {
if (error != null) {
Assert.assertNull(error);
}
}
@Override
public void run() {
done = false;
int val = 0;
while (!done) {
try {
for (int r = 0; r < numRows; r++) {
byte[] row = Bytes.toBytes("row" + r);
Put put = new Put(row);
for (int f = 0; f < families.length; f++) {
for (int q = 0; q < qualifiers.length; q++) {
put.add(families[f], qualifiers[q], (long) val,
Bytes.toBytes(val));
}
}
region.put(put);
if (val > 0 && val % 47 == 0){
//System.out.println("put iteration = " + val);
Delete delete = new Delete(row, (long)val-30, null);
region.delete(delete, null, true);
}
val++;
}
} catch (IOException e) {
LOG.error("error while putting records", e);
error = e;
break;
}
}
}
}
/**
* Writes very wide records and gets the latest row every time..
* Flushes and compacts the region every now and then to keep things
* realistic.
*
* @throws IOException by flush / scan / compaction
* @throws InterruptedException when joining threads
*/
public void testWritesWhileGetting()
throws IOException, InterruptedException {
byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
int testCount = 200;
int numRows = 1;
int numFamilies = 10;
int numQualifiers = 100;
int flushInterval = 10;
int compactInterval = 10 * flushInterval;
byte[][] families = new byte[numFamilies][];
for (int i = 0; i < numFamilies; i++) {
families[i] = Bytes.toBytes("family" + i);
}
byte[][] qualifiers = new byte[numQualifiers][];
for (int i = 0; i < numQualifiers; i++) {
qualifiers[i] = Bytes.toBytes("qual" + i);
}
String method = "testWritesWhileScanning";
initHRegion(tableName, method, families);
PutThread putThread = new PutThread(numRows, families, qualifiers);
putThread.start();
FlushThread flushThread = new FlushThread();
flushThread.start();
Get get = new Get(Bytes.toBytes("row0"));
Result result = null;
int expectedCount = numFamilies * numQualifiers;
long prevTimestamp = 0L;
for (int i = 0; i < testCount; i++) {
if (i != 0 && i % compactInterval == 0) {
region.compactStores(true);
}
if (i != 0 && i % flushInterval == 0) {
//System.out.println("iteration = " + i);
flushThread.flush();
}
boolean previousEmpty = result == null || result.isEmpty();
result = region.get(get, null);
if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
Assert.assertEquals("i=" + i, expectedCount, result.size());
// TODO this was removed, now what dangit?!
// search looking for the qualifier in question?
long timestamp = 0;
for (KeyValue kv : result.sorted()) {
if (Bytes.equals(kv.getFamily(), families[0])
&& Bytes.equals(kv.getQualifier(), qualifiers[0])) {
timestamp = kv.getTimestamp();
}
}
Assert.assertTrue(timestamp >= prevTimestamp);
prevTimestamp = timestamp;
}
}
putThread.done();
putThread.join();
putThread.checkNoError();
flushThread.done();
flushThread.join();
flushThread.checkNoError();
}
public void testIndexesScanWithOneDeletedRow() throws IOException {
byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow");
byte[] family = Bytes.toBytes("family");
//Setting up region
String method = "testIndexesScanWithOneDeletedRow";
initHRegion(tableName, method, new HBaseConfiguration(), family);
Put put = new Put(Bytes.toBytes(1L));
put.add(family, qual1, 1L, Bytes.toBytes(1L));
region.put(put);
region.flushcache();
Delete delete = new Delete(Bytes.toBytes(1L), 1L, null);
//delete.deleteColumn(family, qual1);
region.delete(delete, null, true);
put = new Put(Bytes.toBytes(2L));
put.add(family, qual1, 2L, Bytes.toBytes(2L));
region.put(put);
Scan idxScan = new Scan();
idxScan.addFamily(family);
idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
Arrays.<Filter>asList(new SingleColumnValueFilter(family, qual1,
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes(0L))),
new SingleColumnValueFilter(family, qual1,
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes(3L)))
)));
InternalScanner scanner = region.getScanner(idxScan);
List<KeyValue> res = new ArrayList<KeyValue>();
//long start = System.nanoTime();
while (scanner.next(res)) ;
//long end = System.nanoTime();
//System.out.println("memStoreEmpty=" + memStoreEmpty + ", time=" + (end - start)/1000000D);
assertEquals(1L, res.size());
}
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)
throws IOException {

View File

@ -240,7 +240,7 @@ public class TestStore extends TestCase {
private void flush(int storeFilessize) throws IOException{
this.store.snapshot();
this.store.flushCache(id++);
flushStore(store, id++);
assertEquals(storeFilessize, this.store.getStorefiles().size());
assertEquals(0, this.store.memstore.kvset.size());
}
@ -283,7 +283,7 @@ public class TestStore extends TestCase {
assertTrue(ret > 0);
// then flush.
this.store.flushCache(id++);
flushStore(store, id++);
assertEquals(1, this.store.getStorefiles().size());
// from the one we inserted up there, and a new one
assertEquals(2, this.store.memstore.kvset.size());
@ -309,4 +309,11 @@ public class TestStore extends TestCase {
assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
}
private static void flushStore(Store store, long id) throws IOException {
StoreFlusher storeFlusher = store.getStoreFlusher(id);
storeFlusher.prepare();
storeFlusher.flushCache();
storeFlusher.commit();
}
}