HBASE-8024 Make Store flush algorithm pluggable

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1475870 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-04-25 17:49:01 +00:00
parent e2d6ba1923
commit 0c4416a570
12 changed files with 418 additions and 232 deletions

View File

@ -37,13 +37,17 @@ import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolic
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DefaultStoreEngine extends StoreEngine< public class DefaultStoreEngine extends StoreEngine<
RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> { DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
"hbase.hstore.defaultengine.storeflusher.class";
public static final String DEFAULT_COMPACTOR_CLASS_KEY = public static final String DEFAULT_COMPACTOR_CLASS_KEY =
"hbase.hstore.defaultengine.compactor.class"; "hbase.hstore.defaultengine.compactor.class";
public static final String DEFAULT_COMPACTION_POLICY_CLASS_KEY = public static final String DEFAULT_COMPACTION_POLICY_CLASS_KEY =
"hbase.hstore.defaultengine.compactionpolicy.class"; "hbase.hstore.defaultengine.compactionpolicy.class";
private static final Class<? extends DefaultStoreFlusher>
DEFAULT_STORE_FLUSHER_CLASS = DefaultStoreFlusher.class;
private static final Class<? extends DefaultCompactor> private static final Class<? extends DefaultCompactor>
DEFAULT_COMPACTOR_CLASS = DefaultCompactor.class; DEFAULT_COMPACTOR_CLASS = DefaultCompactor.class;
private static final Class<? extends RatioBasedCompactionPolicy> private static final Class<? extends RatioBasedCompactionPolicy>
@ -69,8 +73,17 @@ public class DefaultStoreEngine extends StoreEngine<
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '" + className + "'", e); throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
} }
className = conf.get(
DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
try {
storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class, Store.class }, new Object[] { conf, store });
} catch (Exception e) {
throw new IOException("Unable to load configured store flusher '" + className + "'", e);
}
} }
@Override @Override
public CompactionContext createCompaction() { public CompactionContext createCompaction() {
return new DefaultCompactionContext(); return new DefaultCompactionContext();

View File

@ -0,0 +1,93 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.util.StringUtils;
/**
* Default implementation of StoreFlusher.
*/
public class DefaultStoreFlusher extends StoreFlusher {
private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class);
private final Object flushLock = new Object();
public DefaultStoreFlusher(Configuration conf, Store store) {
super(conf, store);
}
@Override
public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
MonitoredTask status) throws IOException {
ArrayList<Path> result = new ArrayList<Path>();
if (snapshot.size() == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
KeyValueScanner memstoreScanner =
new CollectionBackedScanner(snapshot, store.getComparator());
InternalScanner scanner = preCreateCoprocScanner(memstoreScanner);
if (scanner == null) {
scanner = createStoreScanner(smallestReadPoint, memstoreScanner);
}
scanner = postCreateCoprocScanner(scanner);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
long flushed = 0;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(
snapshot.size(), store.getFamily().getCompression(), false, true);
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
try {
flushed = performFlush(scanner, writer, smallestReadPoint);
} finally {
finalizeWriter(writer, cacheFlushId, status);
}
}
} finally {
flushedSize.set(flushed);
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
+ StringUtils.humanReadableInt(flushed) +", into tmp file " + writer.getPath());
result.add(writer.getPath());
return result;
}
}

View File

@ -1410,7 +1410,7 @@ public class HRegion implements HeapSize { // , Writable{
this.updatesLock.writeLock().lock(); this.updatesLock.writeLock().lock();
long flushsize = this.memstoreSize.get(); long flushsize = this.memstoreSize.get();
status.setStatus("Preparing to flush by snapshotting stores"); status.setStatus("Preparing to flush by snapshotting stores");
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size()); List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
long flushSeqId = -1L; long flushSeqId = -1L;
try { try {
// Record the mvcc for all transactions in progress. // Record the mvcc for all transactions in progress.
@ -1430,12 +1430,12 @@ public class HRegion implements HeapSize { // , Writable{
} }
for (Store s : stores.values()) { for (Store s : stores.values()) {
storeFlushers.add(s.getStoreFlusher(flushSeqId)); storeFlushCtxs.add(s.createFlushContext(flushSeqId));
} }
// prepare flush (take a snapshot) // prepare flush (take a snapshot)
for (StoreFlusher flusher : storeFlushers) { for (StoreFlushContext flush : storeFlushCtxs) {
flusher.prepare(); flush.prepare();
} }
} finally { } finally {
this.updatesLock.writeLock().unlock(); this.updatesLock.writeLock().unlock();
@ -1472,19 +1472,19 @@ public class HRegion implements HeapSize { // , Writable{
// just-made new flush store file. The new flushed file is still in the // just-made new flush store file. The new flushed file is still in the
// tmp directory. // tmp directory.
for (StoreFlusher flusher : storeFlushers) { for (StoreFlushContext flush : storeFlushCtxs) {
flusher.flushCache(status); flush.flushCache(status);
} }
// Switch snapshot (in memstore) -> new hfile (thus causing // Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek). // all the store scanners to reset/reseek).
for (StoreFlusher flusher : storeFlushers) { for (StoreFlushContext flush : storeFlushCtxs) {
boolean needsCompaction = flusher.commit(status); boolean needsCompaction = flush.commit(status);
if (needsCompaction) { if (needsCompaction) {
compactionRequested = true; compactionRequested = true;
} }
} }
storeFlushers.clear(); storeFlushCtxs.clear();
// Set down the memstore size by amount of flush. // Set down the memstore size by amount of flush.
this.addAndGetGlobalMemstoreSize(-flushsize); this.addAndGetGlobalMemstoreSize(-flushsize);

View File

@ -124,7 +124,6 @@ public class HStore implements Store {
static int closeCheckInterval = 0; static int closeCheckInterval = 0;
private volatile long storeSize = 0L; private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L; private volatile long totalUncompressedBytes = 0L;
private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads; private final boolean verifyBulkLoads;
@ -146,14 +145,14 @@ public class HStore implements Store {
// Comparing KeyValues // Comparing KeyValues
private final KeyValue.KVComparator comparator; private final KeyValue.KVComparator comparator;
final StoreEngine<?, ?, ?> storeEngine; final StoreEngine<?, ?, ?, ?> storeEngine;
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean(); private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
private final OffPeakHours offPeakHours; private final OffPeakHours offPeakHours;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
private static int flush_retries_number; private int flushRetriesNumber;
private static int pauseTime; private int pauseTime;
private long blockingFileCount; private long blockingFileCount;
@ -223,17 +222,13 @@ public class HStore implements Store {
this.checksumType = getChecksumType(conf); this.checksumType = getChecksumType(conf);
// initilize bytes per checksum // initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf); this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction manager. flushRetriesNumber = conf.getInt(
if (HStore.flush_retries_number == 0) { "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
HStore.flush_retries_number = conf.getInt( pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); if (flushRetriesNumber <= 0) {
HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, throw new IllegalArgumentException(
HConstants.DEFAULT_HBASE_SERVER_PAUSE); "hbase.hstore.flush.retries.number must be > 0, not "
if (HStore.flush_retries_number <= 0) { + flushRetriesNumber);
throw new IllegalArgumentException(
"hbase.hstore.flush.retries.number must be > 0, not "
+ HStore.flush_retries_number);
}
} }
} }
@ -645,10 +640,10 @@ public class HStore implements Store {
* @param snapshotTimeRangeTracker * @param snapshotTimeRangeTracker
* @param flushedSize The number of bytes flushed * @param flushedSize The number of bytes flushed
* @param status * @param status
* @return Path The path name of the tmp file to which the store was flushed * @return The path name of the tmp file to which the store was flushed
* @throws IOException * @throws IOException
*/ */
protected Path flushCache(final long logCacheFlushId, protected List<Path> flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot, SortedSet<KeyValue> snapshot,
TimeRangeTracker snapshotTimeRangeTracker, TimeRangeTracker snapshotTimeRangeTracker,
AtomicLong flushedSize, AtomicLong flushedSize,
@ -658,20 +653,21 @@ public class HStore implements Store {
// 'snapshot', the next time flush comes around. // 'snapshot', the next time flush comes around.
// Retry after catching exception when flushing, otherwise server will abort // Retry after catching exception when flushing, otherwise server will abort
// itself // itself
StoreFlusher flusher = storeEngine.getStoreFlusher();
IOException lastException = null; IOException lastException = null;
for (int i = 0; i < HStore.flush_retries_number; i++) { for (int i = 0; i < flushRetriesNumber; i++) {
try { try {
Path pathName = internalFlushCache(snapshot, logCacheFlushId, List<Path> pathNames = flusher.flushSnapshot(
snapshotTimeRangeTracker, flushedSize, status); snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
Path lastPathName = null;
try { try {
// Path name is null if there is no entry to flush for (Path pathName : pathNames) {
if (pathName != null) { lastPathName = pathName;
validateStoreFile(pathName); validateStoreFile(pathName);
} }
return pathName; return pathNames;
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed validating store file " + pathName LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
+ ", retring num=" + i, e);
if (e instanceof IOException) { if (e instanceof IOException) {
lastException = (IOException) e; lastException = (IOException) e;
} else { } else {
@ -695,109 +691,6 @@ public class HStore implements Store {
throw lastException; throw lastException;
} }
/*
* @param cache
* @param logCacheFlushId
* @param snapshotTimeRangeTracker
* @param flushedSize The number of bytes flushed
* @return Path The path name of the tmp file to which the store was flushed
* @throws IOException
*/
private Path internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker,
AtomicLong flushedSize,
MonitoredTask status)
throws IOException {
StoreFile.Writer writer;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
long flushed = 0;
Path pathName;
// Don't flush if there are no entries.
if (set.size() == 0) {
return null;
}
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// treat this as a minor compaction.
InternalScanner scanner = null;
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
if (this.getCoprocessorHost() != null) {
scanner = this.getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
scanner = new StoreScanner(this, scanInfo, scan,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
if (this.getCoprocessorHost() != null) {
InternalScanner cpScanner =
this.getCoprocessorHost().preFlush(this, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return null;
}
scanner = cpScanner;
}
try {
int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
pathName = writer.getPath();
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. Also write current time in metadata as minFlushTime.
// The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
writer.close();
}
}
} finally {
flushedSize.set(flushed);
scanner.close();
}
if (LOG.isInfoEnabled()) {
LOG.info("Flushed " +
", sequenceid=" + logCacheFlushId +
", memsize=" + StringUtils.humanReadableInt(flushed) +
", into tmp file " + pathName);
}
return pathName;
}
/* /*
* @param path The pathname of the tmp file into which the store was flushed * @param path The pathname of the tmp file into which the store was flushed
* @param logCacheFlushId * @param logCacheFlushId
@ -872,17 +765,18 @@ public class HStore implements Store {
/* /*
* Change storeFiles adding into place the Reader produced by this new flush. * Change storeFiles adding into place the Reader produced by this new flush.
* @param sf * @param sfs Store files
* @param set That was used to make the passed file <code>p</code>. * @param set That was used to make the passed file.
* @throws IOException * @throws IOException
* @return Whether compaction is required. * @return Whether compaction is required.
*/ */
private boolean updateStorefiles(final StoreFile sf, private boolean updateStorefiles(
final SortedSet<KeyValue> set) final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
throws IOException {
this.lock.writeLock().lock(); this.lock.writeLock().lock();
try { try {
this.storeEngine.getStoreFileManager().insertNewFile(sf); for (StoreFile sf : sfs) {
this.storeEngine.getStoreFileManager().insertNewFile(sf);
}
this.memstore.clearSnapshot(set); this.memstore.clearSnapshot(set);
} finally { } finally {
// We need the lock, as long as we are updating the storeFiles // We need the lock, as long as we are updating the storeFiles
@ -1747,22 +1641,20 @@ public class HStore implements Store {
} }
} }
public StoreFlusher getStoreFlusher(long cacheFlushId) { public StoreFlushContext createFlushContext(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId); return new StoreFlusherImpl(cacheFlushId);
} }
private class StoreFlusherImpl implements StoreFlusher { private class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushId; private long cacheFlushSeqNum;
private SortedSet<KeyValue> snapshot; private SortedSet<KeyValue> snapshot;
private StoreFile storeFile; private List<Path> tempFiles;
private Path storeFilePath;
private TimeRangeTracker snapshotTimeRangeTracker; private TimeRangeTracker snapshotTimeRangeTracker;
private AtomicLong flushedSize; private final AtomicLong flushedSize = new AtomicLong();
private StoreFlusherImpl(long cacheFlushId) { private StoreFlusherImpl(long cacheFlushSeqNum) {
this.cacheFlushId = cacheFlushId; this.cacheFlushSeqNum = cacheFlushSeqNum;
this.flushedSize = new AtomicLong();
} }
@Override @Override
@ -1774,24 +1666,43 @@ public class HStore implements Store {
@Override @Override
public void flushCache(MonitoredTask status) throws IOException { public void flushCache(MonitoredTask status) throws IOException {
storeFilePath = HStore.this.flushCache( tempFiles = HStore.this.flushCache(
cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status); cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
} }
@Override @Override
public boolean commit(MonitoredTask status) throws IOException { public boolean commit(MonitoredTask status) throws IOException {
if (storeFilePath == null) { if (this.tempFiles == null || this.tempFiles.isEmpty()) {
return false; return false;
} }
storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId, List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
snapshotTimeRangeTracker, flushedSize, status); for (Path storeFilePath : tempFiles) {
if (HStore.this.getCoprocessorHost() != null) { try {
HStore.this.getCoprocessorHost().postFlush(HStore.this, storeFile); storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
snapshotTimeRangeTracker, flushedSize, status));
} catch (IOException ex) {
LOG.error("Failed to commit store file " + storeFilePath, ex);
// Try to delete the files we have committed before.
for (StoreFile sf : storeFiles) {
Path pathToDelete = sf.getPath();
try {
sf.deleteReader();
} catch (IOException deleteEx) {
LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
Runtime.getRuntime().halt(1);
}
}
throw new IOException("Failed to commit the flush", ex);
}
} }
// Add new file to store files. Clear snapshot too while we have if (HStore.this.getCoprocessorHost() != null) {
// the Store write lock. for (StoreFile sf : storeFiles) {
return HStore.this.updateStorefiles(storeFile, snapshot); HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
}
}
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return HStore.this.updateStorefiles(storeFiles, snapshot);
} }
} }
@ -1807,8 +1718,8 @@ public class HStore implements Store {
} }
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) ClassSize.align(ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+ (2 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + (4 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

View File

@ -958,7 +958,7 @@ public class MemStore implements HeapSize {
* @param notpresent True if the kv was NOT present in the set. * @param notpresent True if the kv was NOT present in the set.
* @return Size * @return Size
*/ */
long heapSizeChange(final KeyValue kv, final boolean notpresent) { static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
return notpresent ? return notpresent ?
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
0; 0;

View File

@ -183,7 +183,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
public int getCompactPriority(); public int getCompactPriority();
public StoreFlusher getStoreFlusher(long cacheFlushId); public StoreFlushContext createFlushContext(long cacheFlushId);
// Split oriented methods // Split oriented methods

View File

@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* they are tied together and replaced together via StoreEngine-s. * they are tied together and replaced together via StoreEngine-s.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class StoreEngine< public abstract class StoreEngine<SF extends StoreFlusher,
CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> { CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
protected SF storeFlusher;
protected CP compactionPolicy; protected CP compactionPolicy;
protected C compactor; protected C compactor;
protected SFM storeFileManager; protected SFM storeFileManager;
@ -47,7 +48,7 @@ public abstract class StoreEngine<
*/ */
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
private static final Class<? extends StoreEngine<?, ?, ?>> private static final Class<? extends StoreEngine<?, ?, ?, ?>>
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class; DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
/** /**
@ -71,6 +72,13 @@ public abstract class StoreEngine<
return this.storeFileManager; return this.storeFileManager;
} }
/**
* @return Store flusher to use.
*/
public StoreFlusher getStoreFlusher() {
return this.storeFlusher;
}
/** /**
* Creates an instance of a compaction context specific to this engine. * Creates an instance of a compaction context specific to this engine.
* Doesn't actually select or start a compaction. See CompactionContext class comment. * Doesn't actually select or start a compaction. See CompactionContext class comment.
@ -86,9 +94,11 @@ public abstract class StoreEngine<
private void createComponentsOnce( private void createComponentsOnce(
Configuration conf, Store store, KVComparator kvComparator) throws IOException { Configuration conf, Store store, KVComparator kvComparator) throws IOException {
assert compactor == null && compactionPolicy == null && storeFileManager == null; assert compactor == null && compactionPolicy == null
&& storeFileManager == null && storeFlusher == null;
createComponents(conf, store, kvComparator); createComponents(conf, store, kvComparator);
assert compactor != null && compactionPolicy != null && storeFileManager != null; assert compactor != null && compactionPolicy != null
&& storeFileManager != null && storeFlusher != null;
} }
/** /**
@ -99,11 +109,11 @@ public abstract class StoreEngine<
* @param kvComparator KVComparator for storeFileManager. * @param kvComparator KVComparator for storeFileManager.
* @return StoreEngine to use. * @return StoreEngine to use.
*/ */
public static StoreEngine<?, ?, ?> create( public static StoreEngine<?, ?, ?, ?> create(
Store store, Configuration conf, KVComparator kvComparator) throws IOException { Store store, Configuration conf, KVComparator kvComparator) throws IOException {
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
try { try {
StoreEngine<?,?,?> se = ReflectionUtils.instantiateWithCustomCtor( StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
className, new Class[] { }, new Object[] { }); className, new Class[] { }, new Object[] { });
se.createComponentsOnce(conf, store, kvComparator); se.createComponentsOnce(conf, store, kvComparator);
return se; return se;

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
/**
* A package protected interface for a store flushing.
* A store flush context carries the state required to prepare/flush/commit the store's cache.
*/
@InterfaceAudience.Private
interface StoreFlushContext {
/**
* Prepare for a store flush (create snapshot)
*
* Requires pausing writes.
*
* A very short operation.
*/
void prepare();
/**
* Flush the cache (create the new store file)
*
* A length operation which doesn't require locking out any function
* of the store.
*
* @throws IOException in case the flush fails
*/
void flushCache(MonitoredTask status) throws IOException;
/**
* Commit the flush - add the store file to the store and clear the
* memstore snapshot.
*
* Requires pausing scans.
*
* A very short operation
*
* @return
* @throws IOException
*/
boolean commit(MonitoredTask status) throws IOException;
}

View File

@ -19,47 +19,125 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
/** /**
* A package protected interface for a store flushing. * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
* A store flusher carries the state required to prepare/flush/commit the * Custom implementation can be provided.
* store's cache.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
interface StoreFlusher { abstract class StoreFlusher {
protected Configuration conf;
protected Store store;
public StoreFlusher(Configuration conf, Store store) {
this.conf = conf;
this.store = store;
}
/** /**
* Prepare for a store flush (create snapshot) * Turns a snapshot of memstore into a set of store files.
* * @param snapshot Memstore snapshot.
* Requires pausing writes. * @param cacheFlushSeqNum Log cache flush sequence number.
* * @param snapshotTimeRangeTracker Time range tracker from the memstore
* A very short operation. * pertaining to the snapshot.
* @param flushedSize Out parameter for the size of the KVs flushed.
* @param status Task that represents the flush operation and may be updated with status.
* @return List of files written. Can be empty; must not be null.
*/ */
void prepare(); public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
throws IOException;
protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
// Write out the log sequence number that corresponds to this output
// hfile. Also write current time in metadata as minFlushTime.
// The hfile is current up to and including cacheFlushSeqNum.
status.setStatus("Flushing " + store + ": appending metadata");
writer.appendMetadata(cacheFlushSeqNum, false);
status.setStatus("Flushing " + store + ": closing flushed file");
writer.close();
}
/** Calls coprocessor to create a flush scanner based on memstore scanner */
protected InternalScanner preCreateCoprocScanner(
KeyValueScanner memstoreScanner) throws IOException {
if (store.getCoprocessorHost() != null) {
return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
}
return null;
}
/** Creates the default flush scanner based on memstore scanner */
protected InternalScanner createStoreScanner(long smallestReadPoint,
KeyValueScanner memstoreScanner) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
/** /**
* Flush the cache (create the new store file) * Calls coprocessor to create a scanner based on default flush scanner
* * @return new or default scanner; if null, flush should not proceed.
* A length operation which doesn't require locking out any function
* of the store.
*
* @throws IOException in case the flush fails
*/ */
void flushCache(MonitoredTask status) throws IOException; protected InternalScanner postCreateCoprocScanner(InternalScanner scanner)
throws IOException {
if (store.getCoprocessorHost() != null) {
return store.getCoprocessorHost().preFlush(store, scanner);
}
return scanner;
}
/** /**
* Commit the flush - add the store file to the store and clear the * Performs memstore flush, writing data from scanner into sink.
* memstore snapshot. * @param scanner Scanner to get data from.
* * @param sink Sink to write data to. Could be StoreFile.Writer.
* Requires pausing scans. * @param smallestReadPoint Smallest read point used for the flush.
* * @return Bytes flushed.
* A very short operation s */
* protected long performFlush(InternalScanner scanner,
* @return Compactor.CellSink sink, long smallestReadPoint) throws IOException {
* @throws IOException int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
*/ List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean commit(MonitoredTask status) throws IOException; boolean hasMore;
long flushed = 0;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
sink.append(kv);
flushed += MemStore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
return flushed;
}
} }

View File

@ -32,6 +32,12 @@ import org.mockito.Mockito;
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestDefaultStoreEngine { public class TestDefaultStoreEngine {
public static class DummyStoreFlusher extends DefaultStoreFlusher {
public DummyStoreFlusher(Configuration conf, Store store) {
super(conf, store);
}
}
public static class DummyCompactor extends DefaultCompactor { public static class DummyCompactor extends DefaultCompactor {
public DummyCompactor(Configuration conf, Store store) { public DummyCompactor(Configuration conf, Store store) {
super(conf, store); super(conf, store);
@ -45,15 +51,18 @@ public class TestDefaultStoreEngine {
} }
@Test @Test
public void testCustomPolicyAndCompactor() throws Exception { public void testCustomParts() throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName()); conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName());
conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
DummyCompactionPolicy.class.getName()); DummyCompactionPolicy.class.getName());
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
DummyStoreFlusher.class.getName());
Store mockStore = Mockito.mock(Store.class); Store mockStore = Mockito.mock(Store.class);
StoreEngine<?, ?, ?> se = StoreEngine.create(mockStore, conf, new KVComparator()); StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, new KVComparator());
Assert.assertTrue(se instanceof DefaultStoreEngine); Assert.assertTrue(se instanceof DefaultStoreEngine);
Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy); Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
Assert.assertTrue(se.getCompactor() instanceof DummyCompactor); Assert.assertTrue(se.getCompactor() instanceof DummyCompactor);
} }
} }

View File

@ -732,10 +732,10 @@ public class TestStore extends TestCase {
private static void flushStore(HStore store, long id) throws IOException { private static void flushStore(HStore store, long id) throws IOException {
StoreFlusher storeFlusher = store.getStoreFlusher(id); StoreFlushContext storeFlushCtx = store.createFlushContext(id);
storeFlusher.prepare(); storeFlushCtx.prepare();
storeFlusher.flushCache(Mockito.mock(MonitoredTask.class)); storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlusher.commit(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
} }

View File

@ -61,10 +61,11 @@ import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
@ -548,6 +549,29 @@ public class TestWALReplay {
assertEquals(result.size(), result1b.size()); assertEquals(result.size(), result1b.size());
} }
// StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
// Only throws exception if throwExceptionWhenFlushing is set true.
public static class CustomStoreFlusher extends DefaultStoreFlusher {
// Switch between throw and not throw exception in flush
static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
public CustomStoreFlusher(Configuration conf, Store store) {
super(conf, store);
}
@Override
public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
return super.flushSnapshot(snapshot, cacheFlushId, snapshotTimeRangeTracker,
flushedSize, status);
}
};
/** /**
* Test that we could recover the data correctly after aborting flush. In the * Test that we could recover the data correctly after aborting flush. In the
* test, first we abort flush after writing some data, then writing more data * test, first we abort flush after writing some data, then writing more data
@ -568,28 +592,12 @@ public class TestWALReplay {
// of the families during the load of edits so its seqid is not same as // of the families during the load of edits so its seqid is not same as
// others to test we do right thing when different seqids. // others to test we do right thing when different seqids.
HLog wal = createWAL(this.conf); HLog wal = createWAL(this.conf);
final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
Mockito.doReturn(false).when(rsServices).isAborted(); Mockito.doReturn(false).when(rsServices).isAborted();
HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd, Configuration customConf = new Configuration(this.conf);
rsServices) { customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
@Override CustomStoreFlusher.class.getName());
protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException { HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices);
return new HStore(this, family, conf) {
@Override
protected Path flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot,
TimeRangeTracker snapshotTimeRangeTracker,
AtomicLong flushedSize, MonitoredTask status) throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
return super.flushCache(logCacheFlushId, snapshot,
snapshotTimeRangeTracker, flushedSize, status);
}
};
}
};
long seqid = region.initialize(); long seqid = region.initialize();
// HRegionServer usually does this. It knows the largest seqid across all // HRegionServer usually does this. It knows the largest seqid across all
// regions. // regions.
@ -610,7 +618,7 @@ public class TestWALReplay {
assertEquals(writtenRowCount, getScannedCount(scanner)); assertEquals(writtenRowCount, getScannedCount(scanner));
// Let us flush the region // Let us flush the region
throwExceptionWhenFlushing.set(true); CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
try { try {
region.flushcache(); region.flushcache();
fail("Injected exception hasn't been thrown"); fail("Injected exception hasn't been thrown");
@ -630,7 +638,7 @@ public class TestWALReplay {
} }
writtenRowCount += moreRow; writtenRowCount += moreRow;
// call flush again // call flush again
throwExceptionWhenFlushing.set(false); CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
try { try {
region.flushcache(); region.flushcache();
} catch (IOException t) { } catch (IOException t) {