HBASE-7678 make storefile management pluggable, together with compaction (Sergey)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1448188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-02-20 14:37:57 +00:00
parent e0e0c44e34
commit a6f8131f9d
11 changed files with 280 additions and 124 deletions

View File

@ -0,0 +1,47 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.lang.reflect.InvocationTargetException;
public class ReflectionUtils {
@SuppressWarnings("unchecked")
public static <T> T instantiateWithCustomCtor(String className,
Class<? >[] ctorArgTypes, Object[] ctorArgs) {
try {
Class<? extends T> resultType = (Class<? extends T>) Class.forName(className);
return resultType.getDeclaredConstructor(ctorArgTypes).newInstance(ctorArgs);
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
"Unable to find " + className, e);
} catch (IllegalAccessException e) {
throw new UnsupportedOperationException(
"Unable to access specified class " + className, e);
} catch (InstantiationException e) {
throw new UnsupportedOperationException(
"Unable to instantiate specified class " + className, e);
} catch (InvocationTargetException e) {
throw new UnsupportedOperationException(
"Constructor threw an exception for " + className, e);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"Unable to find suitable constructor for class " + className, e);
}
}
}

View File

@ -0,0 +1,45 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
/**
* Default StoreEngine creates the default compactor, policy, and store file manager.
*/
@InterfaceAudience.Private
public class DefaultStoreEngine extends StoreEngine {
public DefaultStoreEngine(Configuration conf, Store store, KVComparator comparator) {
super(conf, store, comparator);
}
@Override
protected void createComponents(PP<StoreFileManager> storeFileManager,
PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor) {
storeFileManager.set(new DefaultStoreFileManager(this.comparator));
compactionPolicy.set(new DefaultCompactionPolicy(this.conf, this.store));
compactor.set(new DefaultCompactor(this.conf, this.store));
}
}

View File

@ -153,7 +153,7 @@ public class HStore implements Store {
// Comparing KeyValues
private final KeyValue.KVComparator comparator;
private Compactor compactor;
final Compactor compactor;
private OffPeakCompactions offPeakCompactions;
@ -222,7 +222,8 @@ public class HStore implements Store {
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
this.storeFileManager = new DefaultStoreFileManager(this.comparator);
StoreEngine engine = StoreEngine.create(this, this.conf, this.comparator);
this.storeFileManager = engine.getStoreFileManager();
this.storeFileManager.loadFiles(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@ -241,9 +242,9 @@ public class HStore implements Store {
+ HStore.flush_retries_number);
}
}
this.compactionPolicy = CompactionPolicy.create(this, conf);
this.compactionPolicy = engine.getCompactionPolicy();
// Get the compaction tool instance for this policy
this.compactor = compactionPolicy.getCompactor();
this.compactor = engine.getCompactor();
}
/**
@ -1672,6 +1673,7 @@ public class HStore implements Store {
}
@Override
// TODO: why is there this and also getNumberOfStorefiles?! Remove one.
public int getStorefilesCount() {
return this.storeFileManager.getStorefileCount();
}

View File

@ -0,0 +1,134 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* StoreEngine is a factory that can create the objects necessary for HStore to operate.
* Since not all compaction policies, compactors and store file managers are compatible,
* they are tied together and replaced together via StoreEngine-s.
*/
@InterfaceAudience.Private
public abstract class StoreEngine {
protected final Store store;
protected final Configuration conf;
protected final KVComparator comparator;
private final PP<CompactionPolicy> compactionPolicy = new PP<CompactionPolicy>();
private final PP<Compactor> compactor = new PP<Compactor>();
private final PP<StoreFileManager> storeFileManager = new PP<StoreFileManager>();
private boolean isInitialized = false;
/**
* The name of the configuration parameter that specifies the class of
* a store engine that is used to manage and compact HBase store files.
*/
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
private static final Class<? extends StoreEngine>
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
/**
* @return Compaction policy to use.
*/
public CompactionPolicy getCompactionPolicy() {
createComponentsOnce();
return this.compactionPolicy.get();
}
/**
* @return Compactor to use.
*/
public Compactor getCompactor() {
createComponentsOnce();
return this.compactor.get();
}
/**
* @return Store file manager to use.
*/
public StoreFileManager getStoreFileManager() {
createComponentsOnce();
return this.storeFileManager.get();
}
protected StoreEngine(Configuration conf, Store store, KVComparator comparator) {
this.store = store;
this.conf = conf;
this.comparator = comparator;
}
/**
* Create the StoreEngine's components.
* @param storeFileManager out parameter for StoreFileManager.
* @param compactionPolicy out parameter for CompactionPolicy.
* @param compactor out parameter for Compactor.
*/
protected abstract void createComponents(PP<StoreFileManager> storeFileManager,
PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor);
private void createComponentsOnce() {
if (isInitialized) return;
createComponents(storeFileManager, compactionPolicy, compactor);
isInitialized = true;
}
/**
* Create the StoreEngine configured for the given Store.
* @param store The store. An unfortunate dependency needed due to it
* being passed to coprocessors via the compactor.
* @param conf Store configuration.
* @param kvComparator KVComparator for storeFileManager.
* @return StoreEngine to use.
*/
public static StoreEngine create(Store store, Configuration conf, KVComparator kvComparator)
throws IOException {
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
try {
return ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class, Store.class, KVComparator.class },
new Object[] { conf, store, kvComparator });
} catch (Exception e) {
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}
/**
* To allow StoreEngine-s to have custom dependencies between 3 components, we want to create
* them in one place. To return multiple, simulate C++ pointer to pointers/C# out params.
*/
protected static class PP<T> {
private T t = null;
public void set(T t) {
this.t = t;
}
public T get() {
return this.t;
}
}
}

View File

@ -25,33 +25,22 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.util.ReflectionUtils;
/**
* A compaction policy determines how to select files for compaction,
* how to compact them, and how to generate the compacted files.
*/
@InterfaceAudience.Private
public abstract class CompactionPolicy extends Configured {
public abstract class CompactionPolicy {
protected CompactionConfiguration comConf;
protected StoreConfigInformation storeConfigInfo;
/**
* The name of the configuration parameter that specifies
* the class of a compaction policy that is used to compact
* HBase store files.
*/
public static final String COMPACTION_POLICY_KEY =
"hbase.hstore.compaction.policy";
private static final Class<? extends CompactionPolicy>
DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
CompactionConfiguration comConf;
Compactor compactor;
HStore store;
public CompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.storeConfigInfo = storeConfigInfo;
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
}
/**
* This is called before coprocessor preCompactSelection and should filter the candidates
@ -107,68 +96,7 @@ public abstract class CompactionPolicy extends Configured {
* Inform the policy that some configuration has been change,
* so cached value should be updated it any.
*/
public void updateConfiguration() {
if (getConf() != null && store != null) {
comConf = new CompactionConfiguration(getConf(), store);
}
}
/**
* Get the compactor for this policy
* @return the compactor for this policy
*/
public Compactor getCompactor() {
return compactor;
}
/**
* Set the new configuration
*/
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
updateConfiguration();
}
/**
* Upon construction, this method will be called with the HStore
* to be governed. It will be called once and only once.
*/
protected void configureForStore(HStore store) {
this.store = store;
updateConfiguration();
}
/**
* Create the CompactionPolicy configured for the given HStore.
* @param store
* @param conf
* @return a CompactionPolicy
* @throws IOException
*/
public static CompactionPolicy create(HStore store,
Configuration conf) throws IOException {
Class<? extends CompactionPolicy> clazz =
getCompactionPolicyClass(store.getFamily(), conf);
CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
policy.configureForStore(store);
return policy;
}
static Class<? extends CompactionPolicy> getCompactionPolicyClass(
HColumnDescriptor family, Configuration conf) throws IOException {
String className = conf.get(COMPACTION_POLICY_KEY,
DEFAULT_COMPACTION_POLICY_CLASS.getName());
try {
Class<? extends CompactionPolicy> clazz =
Class.forName(className).asSubclass(CompactionPolicy.class);
return clazz;
} catch (Exception e) {
throw new IOException(
"Unable to load configured region compaction policy '"
+ className + "' for column '" + family.getNameAsString()
+ "'", e);
}
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
}
}

View File

@ -32,11 +32,11 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
@InterfaceAudience.Private
public abstract class Compactor {
CompactionProgress progress;
CompactionPolicy policy;
protected CompactionProgress progress;
protected Configuration conf;
Compactor(final CompactionPolicy policy) {
this.policy = policy;
Compactor(final Configuration conf) {
this.conf = conf;
}
/**
@ -51,10 +51,6 @@ public abstract class Compactor {
public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction) throws IOException;
public Configuration getConf() {
return policy.getConf();
}
public CompactionProgress getProgress() {
return this.progress;
}

View File

@ -30,7 +30,10 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -49,8 +52,8 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
public DefaultCompactionPolicy() {
compactor = new DefaultCompactor(this);
public DefaultCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
super(conf, storeConfigInfo);
}
@Override
@ -78,12 +81,13 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
@Override
public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
throws IOException {
// Preliminary compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles);
long cfTtl = this.store.getStoreFileTtl();
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
@ -326,7 +330,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
// Major compaction time has elapsed.
long cfTtl = this.store.getStoreFileTtl();
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.iterator().next();

View File

@ -26,6 +26,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@ -44,14 +45,16 @@ import org.apache.hadoop.util.StringUtils;
/**
* Compact passed set of files.
* Create an instance and then call {@ink #compact(Collection, boolean, long)}.
* Create an instance and then call {@link #compact(Collection, boolean)}.
*/
@InterfaceAudience.Private
class DefaultCompactor extends Compactor {
public class DefaultCompactor extends Compactor {
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
private final Store store;
DefaultCompactor(final CompactionPolicy policy) {
super(policy);
public DefaultCompactor(final Configuration conf, final Store store) {
super(conf);
this.store = store;
}
/**
@ -72,7 +75,6 @@ class DefaultCompactor extends Compactor {
// Calculate maximum key count after compaction (for blooms)
// Also calculate earliest put timestamp if major compaction
int maxKeyCount = 0;
Store store = policy.store;
long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file: filesToCompact) {
StoreFile.Reader r = file.getReader();
@ -116,7 +118,7 @@ class DefaultCompactor extends Compactor {
.getScannersForStoreFiles(filesToCompact, false, false, true);
// Get some configs
int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10);
int compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
Compression.Algorithm compression = store.getFamily().getCompression();
// Avoid overriding compression setting for major compactions if the user
// has not specified it separately

View File

@ -589,10 +589,9 @@ public class TestCompaction extends HBaseTestCase {
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
Collection<StoreFile> storeFiles = store.getStorefiles();
Compactor tool = store.compactionPolicy.getCompactor();
Compactor tool = store.compactor;
List<Path> newFiles =
tool.compact(storeFiles, false);
List<Path> newFiles = tool.compact(storeFiles, false);
// Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf);

View File

@ -288,7 +288,7 @@ public class TestDefaultCompactSelection extends TestCase {
compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
store.compactionPolicy.updateConfiguration();
store.compactionPolicy.setConf(conf);
try {
// trigger an aged major compaction
compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
@ -321,7 +321,7 @@ public class TestDefaultCompactSelection extends TestCase {
*/
// set an off-peak compaction threshold
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
store.compactionPolicy.updateConfiguration();
store.compactionPolicy.setConf(this.conf);
// Test with and without the flag.
compactEquals(sfCreate(999, 50, 12, 12, 1), false, true, 50, 12, 12, 1);
compactEquals(sfCreate(999, 50, 12, 12, 1), 12, 12, 1);

View File

@ -26,8 +26,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@ -61,11 +64,11 @@ public class PerfTestCompactionPolicies {
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{new DefaultCompactionPolicy(), 3, 2, 1.2f},
{new DefaultCompactionPolicy(), 4, 2, 1.2f},
{new DefaultCompactionPolicy(), 5, 2, 1.2f},
{new DefaultCompactionPolicy(), 4, 2, 1.3f},
{new DefaultCompactionPolicy(), 4, 2, 1.4f},
{DefaultCompactionPolicy.class, 3, 2, 1.2f},
{DefaultCompactionPolicy.class, 4, 2, 1.2f},
{DefaultCompactionPolicy.class, 5, 2, 1.2f},
{DefaultCompactionPolicy.class, 4, 2, 1.3f},
{DefaultCompactionPolicy.class, 4, 2, 1.4f},
});
}
@ -77,7 +80,8 @@ public class PerfTestCompactionPolicies {
* @param min The min number of files to compact
* @param ratio The ratio that files must be under to be compacted.
*/
public PerfTestCompactionPolicies(CompactionPolicy cp, int max, int min, float ratio) {
public PerfTestCompactionPolicies(Class<? extends CompactionPolicy> cpClass,
int max, int min, float ratio) {
this.max = max;
this.min = min;
this.ratio = ratio;
@ -86,11 +90,7 @@ public class PerfTestCompactionPolicies {
org.apache.log4j.Logger.getLogger(CompactionConfiguration.class).
setLevel(org.apache.log4j.Level.ERROR);
org.apache.log4j.Logger.getLogger(cp.getClass()).
setLevel(org.apache.log4j.Level.ERROR);
this.cp = cp;
org.apache.log4j.Logger.getLogger(cpClass).setLevel(org.apache.log4j.Level.ERROR);
Configuration configuration = HBaseConfiguration.create();
@ -99,11 +99,10 @@ public class PerfTestCompactionPolicies {
configuration.setInt("hbase.hstore.compaction.min", min);
configuration.setFloat("hbase.hstore.compaction.ratio", ratio);
cp.store = createMockStore();
//Now set the conf.
cp.setConf(configuration);
HStore store = createMockStore();
this.cp = ReflectionUtils.instantiateWithCustomCtor(cpClass.getName(),
new Class[] { Configuration.class, StoreConfigInformation.class },
new Object[] { configuration, store });
//Used for making paths
random = new Random(42);