diff --git a/core/src/main/java/org/apache/hadoop/hbase/HConstants.java b/core/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7924b948164..e31f270f2e1 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/core/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1,5 +1,5 @@ /** - * Copyright 2007 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -286,6 +286,8 @@ public interface HConstants { */ public static int RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 }; + public static final String REGION_IMPL = "hbase.hregion.impl"; + /** modifyTable op for replacing the table descriptor */ public static enum Modify { CLOSE_REGION, diff --git a/core/src/main/java/org/apache/hadoop/hbase/HMerge.java b/core/src/main/java/org/apache/hadoop/hbase/HMerge.java index 80c3c649324..63afdd54427 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/HMerge.java +++ b/core/src/main/java/org/apache/hadoop/hbase/HMerge.java @@ -19,12 +19,6 @@ */ package org.apache.hadoop.hbase; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Random; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,6 +38,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; + /** * A non-instantiable class that has a static method capable of compacting * a table by merging adjacent regions. @@ -152,12 +152,12 @@ class HMerge implements HConstants { for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = - new HRegion(tabledir, hlog, fs, conf, info[i], null); + HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null); currentRegion.initialize(null, null); currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = - new HRegion(tabledir, hlog, fs, conf, info[i + 1], null); + HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null); nextRegion.initialize(null, null); nextSize = nextRegion.getLargestHStoreSize(); @@ -326,7 +326,7 @@ class HMerge implements HConstants { // Scan root region to find all the meta regions - root = new HRegion(rootTableDir, hlog, fs, conf, + root = HRegion.newHRegion(rootTableDir, hlog, fs, conf, HRegionInfo.ROOT_REGIONINFO, null); root.initialize(null, null); diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2c9b1022cef..a5db9da7d30 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,22 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; - import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -70,6 +54,23 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * HRegion stores data for a certain region of a table. It stores all columns * for each row. A given table consists of one or more HRegions. @@ -238,7 +239,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } /** - * HRegion constructor. + * HRegion constructor. his constructor should only be used for testing and + * extensions. Instances of HRegion should be instantiated with the + * {@link org.apache.hadoop.hbase.regionserver.HRegion#newHRegion( org.apache.hadoop.fs.Path, HLog, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.hbase.HBaseConfiguration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)} method. + * * * @param basedir qualified path of directory where region should be located, * usually the table directory. @@ -256,6 +260,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @param flushListener an object that implements CacheFlushListener or null * making progress to master -- otherwise master might think region deploy * failed. Can be null. + * + * @see org.apache.hadoop.hbase.regionserver.HRegion#newHRegion(org.apache.hadoop.fs.Path, HLog, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.hbase.HBaseConfiguration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester) + */ public HRegion(Path basedir, HLog log, FileSystem fs, Configuration conf, HRegionInfo regionInfo, FlushRequester flushListener) { @@ -686,10 +693,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // Create a region instance and then move the splits into place under // regionA and regionB. HRegion regionA = - new HRegion(basedir, log, fs, conf, regionAInfo, null); + HRegion.newHRegion(basedir, log, fs, conf, regionAInfo, null); moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir()); HRegion regionB = - new HRegion(basedir, log, fs, conf, regionBInfo, null); + HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null); moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir()); HRegion regions[] = new HRegion [] {regionA, regionB}; @@ -927,7 +934,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - private boolean internalFlushcache() throws IOException { + protected boolean internalFlushcache() throws IOException { final long startTime = System.currentTimeMillis(); // Clear flush flag. // Record latest flush time @@ -954,12 +961,19 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.updatesLock.writeLock().lock(); // Get current size of memstores. final long currentMemStoreSize = this.memstoreSize.get(); + List storeFlushers = new ArrayList(); try { - for (Store s: stores.values()) { - s.snapshot(); - } sequenceId = log.startCacheFlush(); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); + + for (Store s : stores.values()) { + storeFlushers.add(s.getStoreFlusher(completeSequenceId)); + } + + // prepare flush (take a snapshot) + for (StoreFlusher flusher : storeFlushers) { + flusher.prepare(); + } } finally { this.updatesLock.writeLock().unlock(); } @@ -973,12 +987,25 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // A. Flush memstore to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. - for (Store hstore: stores.values()) { - boolean needsCompaction = hstore.flushCache(completeSequenceId); + + for (StoreFlusher flusher : storeFlushers) { + flusher.flushCache(); + } + + internalPreFlushcacheCommit(); + + /* + * Switch between memstore and the new store file(s). + */ + for (StoreFlusher flusher : storeFlushers) { + boolean needsCompaction = flusher.commit(); if (needsCompaction) { compactionRequested = true; } } + + storeFlushers.clear(); + // Set down the memstore size by amount of flush. this.memstoreSize.addAndGet(-currentMemStoreSize); } catch (Throwable t) { @@ -1022,6 +1049,15 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ return compactionRequested; } + /** + * A hook for sub-classes wishing to perform operations prior to the + * cache flush commit stage. + * + * @throws IOException allow children to throw exception + */ + protected void internalPreFlushcacheCommit() throws IOException { + } + /** * Get the sequence number to be associated with this cache flush. Used by * TransactionalRegion to not complete pending transactions. @@ -1117,13 +1153,18 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ scan.addFamily(family); } } - return new RegionScanner(scan, additionalScanners); + return instantiateInternalScanner(scan, additionalScanners); } finally { newScannerLock.readLock().unlock(); } } + protected InternalScanner instantiateInternalScanner(Scan scan, + List additionalScanners) throws IOException { + return new RegionScanner(scan, additionalScanners); + } + ////////////////////////////////////////////////////////////////////////////// // set() methods for client use. ////////////////////////////////////////////////////////////////////////////// @@ -1976,6 +2017,45 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ } // Utility methods + /** + * A utility method to create new instances of HRegion based on the + * {@link org.apache.hadoop.hbase.HConstants#REGION_IMPL} configuration + * property. + * @param basedir qualified path of directory where region should be located, + * usually the table directory. + * @param log The HLog is the outbound log for any updates to the HRegion + * (There's a single HLog for all the HRegions on a single HRegionServer.) + * The log file is a logfile from the previous execution that's + * custom-computed for this HRegion. The HRegionServer computes and sorts the + * appropriate log info for this HRegion. If there is a previous log file + * (implying that the HRegion has been written-to before), then read it from + * the supplied path. + * @param fs is the filesystem. + * @param conf is global configuration settings. + * @param regionInfo - HRegionInfo that describes the region + * is new), then read them from the supplied path. + * @param flushListener an object that implements CacheFlushListener or null + * making progress to master -- otherwise master might think region deploy + * failed. Can be null. + * @return the new instance + */ + public static HRegion newHRegion(Path basedir, HLog log, FileSystem fs, Configuration conf, + HRegionInfo regionInfo, FlushRequester flushListener) { + try { + @SuppressWarnings("unchecked") + Class regionClass = + (Class) conf.getClass(HConstants.REGION_IMPL, HRegion.class); + + Constructor c = + regionClass.getConstructor(Path.class, HLog.class, FileSystem.class, + Configuration.class, HRegionInfo.class, FlushRequester.class); + + return c.newInstance(basedir, log, fs, conf, regionInfo, flushListener); + } catch (Throwable e) { + // todo: what should I throw here? + throw new IllegalStateException("Could not instantiate a region instance.", e); + } + } /** * Convenience method creating new HRegions. Used by createTable and by the @@ -1998,7 +2078,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName()); FileSystem fs = FileSystem.get(conf); fs.mkdirs(regionDir); - HRegion region = new HRegion(tableDir, + HRegion region = HRegion.newHRegion(tableDir, new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), new Path(regionDir, HREGION_OLDLOGDIR_NAME), conf, null), fs, conf, info, null); @@ -2028,7 +2108,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ if (info == null) { throw new NullPointerException("Passed region info is null"); } - HRegion r = new HRegion( + HRegion r = HRegion.newHRegion( HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()), log, FileSystem.get(conf), conf, info, null); r.initialize(null, null); @@ -2335,7 +2415,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ LOG.debug("Files for new region"); listPaths(fs, newRegionDir); } - HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null); + HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null); dstRegion.initialize(null, null); dstRegion.compactStores(); if (LOG.isDebugEnabled()) { @@ -2592,9 +2672,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ String metaStr = Bytes.toString(HConstants.META_TABLE_NAME); // Currently expects tables have one region only. if (p.getName().startsWith(rootStr)) { - region = new HRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null); + region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null); } else if (p.getName().startsWith(metaStr)) { - region = new HRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO, + region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO, null); } else { throw new IOException("Not a known catalog table: " + p.toString()); diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index be7072a720c..e6c8bc62e68 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1520,7 +1520,7 @@ public class HRegionServer implements HConstants, HRegionInterface, protected HRegion instantiateRegion(final HRegionInfo regionInfo) throws IOException { - HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + HRegion r = HRegion.newHRegion(HTableDescriptor.getTableDir(rootDir, regionInfo .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo, this.cacheFlusher); r.initialize(null, new Progressable() { diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 87cdb476b86..533f67b7081 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -116,7 +116,7 @@ public class MemStore implements HeapSize { /** * Creates a snapshot of the current memstore. - * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)} + * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)} * To get the snapshot made by this method, use {@link #getSnapshot()} */ void snapshot() { @@ -156,7 +156,7 @@ public class MemStore implements HeapSize { * call to {@link #snapshot()} * @return Return snapshot. * @see {@link #snapshot()} - * @see {@link #clearSnapshot(java.util.Map)} + * @see {@link #clearSnapshot(SortedSet)} */ KeyValueSkipListSet getSnapshot() { return this.snapshot; @@ -168,7 +168,7 @@ public class MemStore implements HeapSize { * @throws UnexpectedException * @see {@link #snapshot()} */ - void clearSnapshot(final KeyValueSkipListSet ss) + void clearSnapshot(final SortedSet ss) throws UnexpectedException { this.lock.writeLock().lock(); try { diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java index d9e285b37f7..6c4580e25d3 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java @@ -104,7 +104,7 @@ public class ScanDeleteTracker implements DeleteTracker { @Override public boolean isDeleted(byte [] buffer, int qualifierOffset, int qualifierLength, long timestamp) { - if (timestamp < familyStamp) { + if (timestamp <= familyStamp) { return true; } diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b460ad62018..af00375b35c 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -380,8 +381,11 @@ public class Store implements HConstants, HeapSize { if (maxSeqIdInLog > -1) { // We read some edits, so we should flush the memstore - this.snapshot(); - boolean needCompaction = this.flushCache(maxSeqIdInLog); + StoreFlusher flusher = getStoreFlusher(maxSeqIdInLog); + flusher.prepare(); + flusher.flushCache(); + boolean needCompaction = flusher.commit(); + if (needCompaction) { this.compact(false); } @@ -499,7 +503,7 @@ public class Store implements HConstants, HeapSize { /** * Snapshot this stores memstore. Call before running - * {@link #flushCache(long)} so it has some work to do. + * {@link #flushCache(long, SortedSet)} so it has some work to do. */ void snapshot() { this.memstore.snapshot(); @@ -512,21 +516,13 @@ public class Store implements HConstants, HeapSize { * @return true if a compaction is needed * @throws IOException */ - boolean flushCache(final long logCacheFlushId) throws IOException { - // Get the snapshot to flush. Presumes that a call to - // this.memstore.snapshot() has happened earlier up in the chain. - KeyValueSkipListSet snapshot = this.memstore.getSnapshot(); + private StoreFile flushCache(final long logCacheFlushId, + SortedSet snapshot) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. - StoreFile sf = internalFlushCache(snapshot, logCacheFlushId); - if (sf == null) { - return false; - } - // Add new file to store files. Clear snapshot too while we have the - // Store write lock. - int size = updateStorefiles(logCacheFlushId, sf, snapshot); - return size >= this.compactionThreshold; + return internalFlushCache(snapshot, logCacheFlushId); + } /* @@ -535,7 +531,7 @@ public class Store implements HConstants, HeapSize { * @return StoreFile created. * @throws IOException */ - private StoreFile internalFlushCache(final KeyValueSkipListSet set, + private StoreFile internalFlushCache(final SortedSet set, final long logCacheFlushId) throws IOException { HFile.Writer writer = null; @@ -605,20 +601,18 @@ public class Store implements HConstants, HeapSize { * @param sf * @param set That was used to make the passed file p. * @throws IOException - * @return Count of store files. + * @return Whether compaction is required. */ - private int updateStorefiles(final long logCacheFlushId, - final StoreFile sf, final KeyValueSkipListSet set) + private boolean updateStorefiles(final long logCacheFlushId, + final StoreFile sf, final SortedSet set) throws IOException { - int count = 0; this.lock.writeLock().lock(); try { this.storefiles.put(Long.valueOf(logCacheFlushId), sf); - count = this.storefiles.size(); // Tell listeners of the change in readers. notifyChangedReadersObservers(); this.memstore.clearSnapshot(set); - return count; + return this.storefiles.size() >= this.compactionThreshold; } finally { this.lock.writeLock().unlock(); } @@ -1513,6 +1507,42 @@ public class Store implements HConstants, HeapSize { } } + public StoreFlusher getStoreFlusher(long cacheFlushId) { + return new StoreFlusherImpl(cacheFlushId); + } + + private class StoreFlusherImpl implements StoreFlusher { + + private long cacheFlushId; + private SortedSet snapshot; + private StoreFile storeFile; + + private StoreFlusherImpl(long cacheFlushId) { + this.cacheFlushId = cacheFlushId; + } + + @Override + public void prepare() { + memstore.snapshot(); + this.snapshot = memstore.getSnapshot(); + } + + @Override + public void flushCache() throws IOException { + storeFile = Store.this.flushCache(cacheFlushId, snapshot); + } + + @Override + public boolean commit() throws IOException { + if (storeFile == null) { + return false; + } + // Add new file to store files. Clear snapshot too while we have + // the Store write lock. + return Store.this.updateStorefiles(cacheFlushId, storeFile, snapshot); + } + } + /** * See if there's too much store files in this store * @return true if number of store files is greater than diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java new file mode 100644 index 00000000000..1c136396178 --- /dev/null +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -0,0 +1,63 @@ +/* + * Copyright 2010 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +/** + * A package protected interface for a store flushing. + * A store flusher carries the state required to prepare/flush/commit the + * store's cache. + */ +interface StoreFlusher { + + /** + * Prepare for a store flush (create snapshot) + * + * Requires pausing writes. + * + * A very short operation. + */ + void prepare(); + + /** + * Flush the cache (create the new store file) + * + * A length operation which doesn't require locking out any function + * of the store. + * + * @throws IOException in case the flush fails + */ + void flushCache() throws IOException; + + /** + * Commit the flush - add the store file to the store and clear the + * memstore snapshot. + * + * Requires pausing scans. + * + * A very short operation + * + * @return + * @throws IOException + */ + boolean commit() throws IOException; + +} diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 616ba67e5de..c71fa428100 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1,5 +1,5 @@ /** - * Copyright 2007 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,13 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -41,13 +34,26 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; /** * Basic stand-alone testing of HRegion. @@ -64,13 +70,13 @@ public class TestHRegion extends HBaseTestCase { private final int MAX_VERSIONS = 2; // Test names - private final byte[] tableName = Bytes.toBytes("testtable");; - private final byte[] qual1 = Bytes.toBytes("qual1"); - private final byte[] qual2 = Bytes.toBytes("qual2"); - private final byte[] qual3 = Bytes.toBytes("qual3"); - private final byte[] value1 = Bytes.toBytes("value1"); - private final byte[] value2 = Bytes.toBytes("value2"); - private final byte [] row = Bytes.toBytes("rowA"); + protected final byte[] tableName = Bytes.toBytes("testtable");; + protected final byte[] qual1 = Bytes.toBytes("qual1"); + protected final byte[] qual2 = Bytes.toBytes("qual2"); + protected final byte[] qual3 = Bytes.toBytes("qual3"); + protected final byte[] value1 = Bytes.toBytes("value1"); + protected final byte[] value2 = Bytes.toBytes("value2"); + protected final byte [] row = Bytes.toBytes("rowA"); /** * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() @@ -1799,6 +1805,378 @@ public class TestHRegion extends HBaseTestCase { } } + + /** + * Flushes the cache in a thread while scanning. The tests verify that the + * scan is coherent - e.g. the returned results are always of the same or + * later update as the previous results. + * @throws IOException scan / compact + * @throws InterruptedException thread join + */ + public void testFlushCacheWhileScanning() throws IOException, InterruptedException { + byte[] tableName = Bytes.toBytes("testFlushCacheWhileScanning"); + byte[] family = Bytes.toBytes("family"); + int numRows = 1000; + int flushAndScanInterval = 10; + int compactInterval = 10 * flushAndScanInterval; + + String method = "testFlushCacheWhileScanning"; + initHRegion(tableName,method, family); + FlushThread flushThread = new FlushThread(); + flushThread.start(); + + Scan scan = new Scan(); + scan.addFamily(family); + scan.setFilter(new SingleColumnValueFilter(family, qual1, + CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L)))); + + int expectedCount = 0; + List res = new ArrayList(); + + boolean toggle=true; + for (long i = 0; i < numRows; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(family, qual1, Bytes.toBytes(i % 10)); + region.put(put); + + if (i != 0 && i % compactInterval == 0) { + //System.out.println("iteration = " + i); + region.compactStores(true); + } + + if (i % 10 == 5L) { + expectedCount++; + } + + if (i != 0 && i % flushAndScanInterval == 0) { + res.clear(); + InternalScanner scanner = region.getScanner(scan); + if (toggle) { + flushThread.flush(); + } + while (scanner.next(res)) ; + if (!toggle) { + flushThread.flush(); + } + Assert.assertEquals("i=" + i, expectedCount, res.size()); + toggle = !toggle; + } + } + + flushThread.done(); + flushThread.join(); + flushThread.checkNoError(); + } + + protected class FlushThread extends Thread { + private volatile boolean done; + private Throwable error = null; + + public void done() { + done = true; + synchronized (this) { + interrupt(); + } + } + + public void checkNoError() { + if (error != null) { + Assert.assertNull(error); + } + } + + @Override + public void run() { + done = false; + while (!done) { + synchronized (this) { + try { + wait(); + } catch (InterruptedException ignored) { + if (done) { + break; + } + } + } + try { + region.flushcache(); + } catch (IOException e) { + if (!done) { + LOG.error("Error while flusing cache", e); + error = e; + } + break; + } + } + + } + + public void flush() { + synchronized (this) { + notify(); + } + + } + } + + /** + * Writes very wide records and scans for the latest every time.. + * Flushes and compacts the region every now and then to keep things + * realistic. + * + * @throws IOException by flush / scan / compaction + * @throws InterruptedException when joining threads + */ + public void testWritesWhileScanning() + throws IOException, InterruptedException { + byte[] tableName = Bytes.toBytes("testWritesWhileScanning"); + int testCount = 100; + int numRows = 1; + int numFamilies = 10; + int numQualifiers = 100; + int flushInterval = 7; + int compactInterval = 5 * flushInterval; + byte[][] families = new byte[numFamilies][]; + for (int i = 0; i < numFamilies; i++) { + families[i] = Bytes.toBytes("family" + i); + } + byte[][] qualifiers = new byte[numQualifiers][]; + for (int i = 0; i < numQualifiers; i++) { + qualifiers[i] = Bytes.toBytes("qual" + i); + } + + String method = "testWritesWhileScanning"; + initHRegion(tableName, method, families); + PutThread putThread = new PutThread(numRows, families, qualifiers); + putThread.start(); + FlushThread flushThread = new FlushThread(); + flushThread.start(); + + Scan scan = new Scan(); + scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, + new BinaryComparator(Bytes.toBytes("row0")))); + + int expectedCount = numFamilies * numQualifiers; + List res = new ArrayList(); + + long prevTimestamp = 0L; + for (int i = 0; i < testCount; i++) { + + if (i != 0 && i % compactInterval == 0) { + region.compactStores(true); + } + + if (i != 0 && i % flushInterval == 0) { + //System.out.println("scan iteration = " + i); + flushThread.flush(); + } + + boolean previousEmpty = res.isEmpty(); + res.clear(); + InternalScanner scanner = region.getScanner(scan); + while (scanner.next(res)) ; + if (!res.isEmpty() || !previousEmpty || i > compactInterval) { + Assert.assertEquals("i=" + i, expectedCount, res.size()); + long timestamp = res.get(0).getTimestamp(); + Assert.assertTrue(timestamp >= prevTimestamp); + prevTimestamp = timestamp; + } + } + + putThread.done(); + putThread.join(); + putThread.checkNoError(); + + flushThread.done(); + flushThread.join(); + flushThread.checkNoError(); + } + + protected class PutThread extends Thread { + private volatile boolean done; + private Throwable error = null; + private int numRows; + private byte[][] families; + private byte[][] qualifiers; + + private PutThread(int numRows, byte[][] families, + byte[][] qualifiers) { + this.numRows = numRows; + this.families = families; + this.qualifiers = qualifiers; + } + + public void done() { + done = true; + synchronized (this) { + interrupt(); + } + } + + public void checkNoError() { + if (error != null) { + Assert.assertNull(error); + } + } + + @Override + public void run() { + done = false; + int val = 0; + while (!done) { + try { + for (int r = 0; r < numRows; r++) { + byte[] row = Bytes.toBytes("row" + r); + Put put = new Put(row); + for (int f = 0; f < families.length; f++) { + for (int q = 0; q < qualifiers.length; q++) { + put.add(families[f], qualifiers[q], (long) val, + Bytes.toBytes(val)); + } + } + region.put(put); + if (val > 0 && val % 47 == 0){ + //System.out.println("put iteration = " + val); + Delete delete = new Delete(row, (long)val-30, null); + region.delete(delete, null, true); + } + val++; + } + } catch (IOException e) { + LOG.error("error while putting records", e); + error = e; + break; + } + } + + } + + } + + + /** + * Writes very wide records and gets the latest row every time.. + * Flushes and compacts the region every now and then to keep things + * realistic. + * + * @throws IOException by flush / scan / compaction + * @throws InterruptedException when joining threads + */ + public void testWritesWhileGetting() + throws IOException, InterruptedException { + byte[] tableName = Bytes.toBytes("testWritesWhileScanning"); + int testCount = 200; + int numRows = 1; + int numFamilies = 10; + int numQualifiers = 100; + int flushInterval = 10; + int compactInterval = 10 * flushInterval; + byte[][] families = new byte[numFamilies][]; + for (int i = 0; i < numFamilies; i++) { + families[i] = Bytes.toBytes("family" + i); + } + byte[][] qualifiers = new byte[numQualifiers][]; + for (int i = 0; i < numQualifiers; i++) { + qualifiers[i] = Bytes.toBytes("qual" + i); + } + + String method = "testWritesWhileScanning"; + initHRegion(tableName, method, families); + PutThread putThread = new PutThread(numRows, families, qualifiers); + putThread.start(); + FlushThread flushThread = new FlushThread(); + flushThread.start(); + + Get get = new Get(Bytes.toBytes("row0")); + Result result = null; + + int expectedCount = numFamilies * numQualifiers; + + long prevTimestamp = 0L; + for (int i = 0; i < testCount; i++) { + + if (i != 0 && i % compactInterval == 0) { + region.compactStores(true); + } + + if (i != 0 && i % flushInterval == 0) { + //System.out.println("iteration = " + i); + flushThread.flush(); + } + + boolean previousEmpty = result == null || result.isEmpty(); + result = region.get(get, null); + if (!result.isEmpty() || !previousEmpty || i > compactInterval) { + Assert.assertEquals("i=" + i, expectedCount, result.size()); + // TODO this was removed, now what dangit?! + // search looking for the qualifier in question? + long timestamp = 0; + for (KeyValue kv : result.sorted()) { + if (Bytes.equals(kv.getFamily(), families[0]) + && Bytes.equals(kv.getQualifier(), qualifiers[0])) { + timestamp = kv.getTimestamp(); + } + } + Assert.assertTrue(timestamp >= prevTimestamp); + prevTimestamp = timestamp; + } + } + + putThread.done(); + putThread.join(); + putThread.checkNoError(); + + flushThread.done(); + flushThread.join(); + flushThread.checkNoError(); + } + + + public void testIndexesScanWithOneDeletedRow() throws IOException { + byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow"); + byte[] family = Bytes.toBytes("family"); + + //Setting up region + String method = "testIndexesScanWithOneDeletedRow"; + initHRegion(tableName, method, new HBaseConfiguration(), family); + + Put put = new Put(Bytes.toBytes(1L)); + put.add(family, qual1, 1L, Bytes.toBytes(1L)); + region.put(put); + + region.flushcache(); + + Delete delete = new Delete(Bytes.toBytes(1L), 1L, null); + //delete.deleteColumn(family, qual1); + region.delete(delete, null, true); + + put = new Put(Bytes.toBytes(2L)); + put.add(family, qual1, 2L, Bytes.toBytes(2L)); + region.put(put); + + Scan idxScan = new Scan(); + idxScan.addFamily(family); + idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, + Arrays.asList(new SingleColumnValueFilter(family, qual1, + CompareFilter.CompareOp.GREATER_OR_EQUAL, + new BinaryComparator(Bytes.toBytes(0L))), + new SingleColumnValueFilter(family, qual1, + CompareFilter.CompareOp.LESS_OR_EQUAL, + new BinaryComparator(Bytes.toBytes(3L))) + ))); + InternalScanner scanner = region.getScanner(idxScan); + List res = new ArrayList(); + + //long start = System.nanoTime(); + while (scanner.next(res)) ; + //long end = System.nanoTime(); + //System.out.println("memStoreEmpty=" + memStoreEmpty + ", time=" + (end - start)/1000000D); + assertEquals(1L, res.size()); + + } + + + private void putData(int startRow, int numRows, byte [] qf, byte [] ...families) throws IOException { diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index ea434916428..9f240f265b5 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -240,7 +240,7 @@ public class TestStore extends TestCase { private void flush(int storeFilessize) throws IOException{ this.store.snapshot(); - this.store.flushCache(id++); + flushStore(store, id++); assertEquals(storeFilessize, this.store.getStorefiles().size()); assertEquals(0, this.store.memstore.kvset.size()); } @@ -283,7 +283,7 @@ public class TestStore extends TestCase { assertTrue(ret > 0); // then flush. - this.store.flushCache(id++); + flushStore(store, id++); assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one assertEquals(2, this.store.memstore.kvset.size()); @@ -309,4 +309,11 @@ public class TestStore extends TestCase { assertEquals(oldValue, Bytes.toLong(results.get(1).getValue())); } + + private static void flushStore(Store store, long id) throws IOException { + StoreFlusher storeFlusher = store.getStoreFlusher(id); + storeFlusher.prepare(); + storeFlusher.flushCache(); + storeFlusher.commit(); + } } \ No newline at end of file