HBASE-20259 Doc configs for in-memory-compaction and add detail to

in-memory-compaction logging

Adds logging of CompactingMemStore configuration on construction.

Add logging of detail about Store on creation including memstore type.

Add chapter to refguide on new in-memory compaction feature.
This commit is contained in:
Michael Stack 2018-03-30 12:35:28 -07:00
parent 4b909b890d
commit d4e115bf34
11 changed files with 208 additions and 61 deletions

View File

@ -183,8 +183,8 @@ public class TableDescriptorBuilder {
public static final boolean DEFAULT_NORMALIZATION_ENABLED = false; public static final boolean DEFAULT_NORMALIZATION_ENABLED = false;
/** /**
* Constant that denotes the maximum default size of the memstore after which * Constant that denotes the maximum default size of the memstore in bytes after which
* the contents are flushed to the store files * the contents are flushed to the store files.
*/ */
public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L; public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L;

View File

@ -1732,11 +1732,9 @@ public class HFileBlock implements Cacheable {
// and will save us having to seek the stream backwards to reread the header we // and will save us having to seek the stream backwards to reread the header we
// read the last time through here. // read the last time through here.
ByteBuffer headerBuf = getCachedHeader(offset); ByteBuffer headerBuf = getCachedHeader(offset);
if (LOG.isTraceEnabled()) { LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " +
LOG.trace("Reading " + this.fileContext.getHFileName() + " at offset=" + offset + "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread,
", pread=" + pread + ", verifyChecksum=" + verifyChecksum + ", cachedHeader=" + verifyChecksum, headerBuf, onDiskSizeWithHeader);
headerBuf + ", onDiskSizeWithHeader=" + onDiskSizeWithHeader);
}
// This is NOT same as verifyChecksum. This latter is whether to do hbase // This is NOT same as verifyChecksum. This latter is whether to do hbase
// checksums. Can change with circumstances. The below flag is whether the // checksums. Can change with circumstances. The below flag is whether the
// file has support for checksums (version 2+). // file has support for checksums (version 2+).
@ -1800,9 +1798,7 @@ public class HFileBlock implements Cacheable {
if (!fileContext.isCompressedOrEncrypted()) { if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed(); hFileBlock.sanityCheckUncompressed();
} }
if (LOG.isTraceEnabled()) { LOG.trace("Read {} in {} ns", hFileBlock, duration);
LOG.trace("Read " + hFileBlock + " in " + duration + " ns");
}
// Cache next block header if we read it for the next time through here. // Cache next block header if we read it for the next time through here.
if (nextBlockOnDiskSize != -1) { if (nextBlockOnDiskSize != -1) {
cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),

View File

@ -41,8 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{ public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
private static final String NAME = "ADAPTIVE";
private static final String name = "ADAPTIVE";
public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY = public static final String ADAPTIVE_COMPACTION_THRESHOLD_KEY =
"hbase.hregion.compacting.memstore.adaptive.compaction.threshold"; "hbase.hregion.compacting.memstore.adaptive.compaction.threshold";
private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5; private static final double ADAPTIVE_COMPACTION_THRESHOLD_DEFAULT = 0.5;
@ -73,12 +72,13 @@ public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrate
if(r < compactionProbability) { if(r < compactionProbability) {
numCellsInVersionedList = versionedList.getNumOfCells(); numCellsInVersionedList = versionedList.getNumOfCells();
compacted = true; compacted = true;
return compact(versionedList, name+" (compaction probability="+compactionProbability+")"); return compact(versionedList,
getName() + " (compaction probability=" + compactionProbability + ")");
} }
} }
compacted = false; compacted = false;
return simpleMergeOrFlatten(versionedList, return simpleMergeOrFlatten(versionedList,
name+" (compaction probability="+compactionProbability+")"); getName() + " (compaction probability=" + compactionProbability + ")");
} }
@Override @Override
@ -112,4 +112,8 @@ public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrate
return Action.FLATTEN; return Action.FLATTEN;
} }
@Override
protected String getName() {
return NAME;
}
} }

View File

@ -29,8 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{ public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
private static final String NAME = "BASIC";
private static final String name = "BASIC";
public BasicMemStoreCompactionStrategy(Configuration conf, String cfName) { public BasicMemStoreCompactionStrategy(Configuration conf, String cfName) {
super(conf, cfName); super(conf, cfName);
@ -38,6 +37,11 @@ public class BasicMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
@Override @Override
public Action getAction(VersionedSegmentsList versionedList) { public Action getAction(VersionedSegmentsList versionedList) {
return simpleMergeOrFlatten(versionedList, name); return simpleMergeOrFlatten(versionedList, getName());
}
@Override
protected String getName() {
return NAME;
} }
} }

View File

@ -119,6 +119,10 @@ public class CompactingMemStore extends AbstractMemStore {
// initialization of the flush size should happen after initialization of the index type // initialization of the flush size should happen after initialization of the index type
// so do not transfer the following method // so do not transfer the following method
initInmemoryFlushSize(conf); initInmemoryFlushSize(conf);
LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " +
"compactor={}", this.store.getColumnFamilyName(),
StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType,
(this.compactor == null? "NULL": this.compactor.toString()));
} }
@VisibleForTesting @VisibleForTesting
@ -141,8 +145,6 @@ public class CompactingMemStore extends AbstractMemStore {
IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT); IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
inmemoryFlushSize = (long) (inmemoryFlushSize * factor); inmemoryFlushSize = (long) (inmemoryFlushSize * factor);
LOG.info("Setting in-memory flush size threshold to {} and immutable segments index to type={}",
StringUtils.byteDesc(inmemoryFlushSize), indexType);
} }
/** /**

View File

@ -23,14 +23,19 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public class EagerMemStoreCompactionStrategy extends MemStoreCompactionStrategy{ public class EagerMemStoreCompactionStrategy extends MemStoreCompactionStrategy{
private static final String NAME = "EAGER";
private static final String name = "EAGER";
public EagerMemStoreCompactionStrategy(Configuration conf, String cfName) { public EagerMemStoreCompactionStrategy(Configuration conf, String cfName) {
super(conf, cfName); super(conf, cfName);
} }
@Override @Override
public Action getAction(VersionedSegmentsList versionedList) { public Action getAction(VersionedSegmentsList versionedList) {
return compact(versionedList, name); return compact(versionedList, getName());
}
@Override
protected String getName() {
return NAME;
} }
} }

View File

@ -179,6 +179,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
private final boolean verifyBulkLoads; private final boolean verifyBulkLoads;
/**
* Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the
* threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will
* log a message that identifies the Store experience this high-level of concurrency.
*/
private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
private final int parallelPutCountPrintThreshold; private final int parallelPutCountPrintThreshold;
@ -257,8 +262,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
} }
this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim()); this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
this.dataBlockEncoder = this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
this.comparator = region.getCellComparator(); this.comparator = region.getCellComparator();
// used by ScanQueryMatcher // used by ScanQueryMatcher
@ -270,32 +274,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
// Why not just pass a HColumnDescriptor in here altogether? Even if have // Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it? // to clone it?
scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator); scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
MemoryCompactionPolicy inMemoryCompaction = null; this.memstore = getMemstore();
if (this.getTableName().isSystemTable()) {
inMemoryCompaction = MemoryCompactionPolicy
.valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE"));
} else {
inMemoryCompaction = family.getInMemoryCompaction();
}
if (inMemoryCompaction == null) {
inMemoryCompaction =
MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT));
}
String className;
switch (inMemoryCompaction) {
case NONE:
className = DefaultMemStore.class.getName();
this.memstore = ReflectionUtils.newInstance(DefaultMemStore.class,
new Object[] { conf, this.comparator });
break;
default:
Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
CompactingMemStore.class, CompactingMemStore.class);
className = clz.getName();
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
}
this.offPeakHours = OffPeakHours.getInstance(conf); this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family // Setting up cache configuration for this family
@ -335,13 +315,48 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
} }
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); int confPrintThreshold =
this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
if (confPrintThreshold < 10) { if (confPrintThreshold < 10) {
confPrintThreshold = 10; confPrintThreshold = 10;
} }
this.parallelPutCountPrintThreshold = confPrintThreshold; this.parallelPutCountPrintThreshold = confPrintThreshold;
LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold=" LOG.info("Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, " +
+ parallelPutCountPrintThreshold); "parallelPutCountPrintThreshold={}", getColumnFamilyName(),
this.memstore.getClass().getSimpleName(), policyName,
this.verifyBulkLoads, this.parallelPutCountPrintThreshold);
}
/**
* @return MemStore Instance to use in this store.
*/
private MemStore getMemstore() {
MemStore ms = null;
// Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!
MemoryCompactionPolicy inMemoryCompaction = null;
if (this.getTableName().isSystemTable()) {
inMemoryCompaction = MemoryCompactionPolicy.valueOf(
conf.get("hbase.systemtables.compacting.memstore.type", "NONE"));
} else {
inMemoryCompaction = family.getInMemoryCompaction();
}
if (inMemoryCompaction == null) {
inMemoryCompaction =
MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT));
}
switch (inMemoryCompaction) {
case NONE:
ms = ReflectionUtils.newInstance(DefaultMemStore.class,
new Object[]{conf, this.comparator});
break;
default:
Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
CompactingMemStore.class, CompactingMemStore.class);
ms = ReflectionUtils.newInstance(clz, new Object[]{conf, this.comparator, this,
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction});
}
return ms;
} }
/** /**
@ -706,10 +721,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
lock.readLock().lock(); lock.readLock().lock();
try { try {
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
if (LOG.isTraceEnabled()) { LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " +
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName());
.getColumnFamilyName() + " too Busy!");
}
} }
this.memstore.add(cell, memstoreSizing); this.memstore.add(cell, memstoreSizing);
} finally { } finally {
@ -725,10 +738,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
lock.readLock().lock(); lock.readLock().lock();
try { try {
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
if (LOG.isTraceEnabled()) { LOG.trace(this.getTableName() + "tableName={}, encodedName={}, columnFamilyName={} is " +
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this "too busy!", this.getRegionInfo().getEncodedName(), this .getColumnFamilyName());
.getColumnFamilyName() + " too Busy!");
}
} }
memstore.add(cells, memstoreSizing); memstore.add(cells, memstoreSizing);
} finally { } finally {

View File

@ -73,6 +73,13 @@ public abstract class MemStoreCompactionStrategy {
} }
} }
@Override
public String toString() {
return getName() + ", pipelineThreshold=" + this.pipelineThreshold;
}
protected abstract String getName();
// get next compaction action to apply on compaction pipeline // get next compaction action to apply on compaction pipeline
public abstract Action getAction(VersionedSegmentsList versionedList); public abstract Action getAction(VersionedSegmentsList versionedList);
// update policy stats based on the segment that replaced previous versioned list (in // update policy stats based on the segment that replaced previous versioned list (in
@ -108,5 +115,4 @@ public abstract class MemStoreCompactionStrategy {
cfName, numOfSegments); cfName, numOfSegments);
return Action.COMPACT; return Action.COMPACT;
} }
} }

View File

@ -79,6 +79,11 @@ public class MemStoreCompactor {
compactingMemStore.getFamilyName()); compactingMemStore.getFamilyName());
} }
@Override
public String toString() {
return this.strategy + ", compactionCellMax=" + this.compactionKVMax;
}
/**---------------------------------------------------------------------- /**----------------------------------------------------------------------
* The request to dispatch the compaction asynchronous task. * The request to dispatch the compaction asynchronous task.
* The method returns true if compaction was successfully dispatched, or false if there * The method returns true if compaction was successfully dispatched, or false if there

View File

@ -0,0 +1,113 @@
////
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
////
[[inmemory_compaction]]
= In-memory Compaction
:doctype: book
:numbered:
:toc: left
:icons: font
:experimental:
[[imc.overview]]
== Overview
In-memory Compaction (A.K.A Accordion) is a new feature in hbase-2.0.0.
It was first introduced on the Apache HBase Blog at
link:https://blogs.apache.org/hbase/entry/accordion-hbase-breathes-with-in[Accordion: HBase Breathes with In-Memory Compaction].
Quoting the blog:
____
Accordion reapplies the LSM principal [_Log-Structured-Merge Tree_, the design pattern upon which HBase is based] to MemStore, in order to eliminate redundancies and other overhead while the data is still in RAM. Doing so decreases the frequency of flushes to HDFS, thereby reducing the write amplification and the overall disk footprint. With less flushes, the write operations are stalled less frequently as the MemStore overflows, therefore the write performance is improved. Less data on disk also implies less pressure on the block cache, higher hit rates, and eventually better read response times. Finally, having less disk writes also means having less compaction happening in the background, i.e., less cycles are stolen from productive (read and write) work. All in all, the effect of in-memory compaction can be envisioned as a catalyst that enables the system move faster as a whole.
____
A developer view is available at
link:https://blogs.apache.org/hbase/entry/accordion-developer-view-of-in[Accordion: Developer View of In-Memory Compaction].
In-memory compaction works best when high data churn; overwrites or over-versions
can be eliminated while the data is still in memory. If the writes are all uniques,
it may drag write throughput (In-memory compaction costs CPU). We suggest you test
and compare before deploying to production.
In this section we describe how to enable Accordion and the available configurations.
== Enabling
To enable in-memory compactions, set the _IN_MEMORY_COMPACTION_ attribute
on per column family where you want the behavior. The _IN_MEMORY_COMPACTION_
attribute can have one of three values.
* _NONE_: No in-memory compaction.
* _BASIC_: Basic policy enables flushing and keeps a pipeline of flushes until we trip the pipeline maximum threshold and then we flush to disk. No in-memory compaction but can help throughput as data is moved from the profligate, native ConcurrentSkipListMap data-type to more compact (and efficient) data types.
* _EAGER_: This is _BASIC_ policy plus in-memory compaction of flushes (much like the on-disk compactions done to hfiles); on compaction we apply on-disk rules eliminating versions, duplicates, ttl'd cells, etc.
* _ADAPTIVE_: Adaptive compaction adapts to the workload. It applies either index compaction or data compaction based on the ratio of duplicate cells in the data. Experimental.
To enable _BASIC_ on the _info_ column family in the table _radish_, disable the table and add the attribute to the _info_ column family, and then reenable:
[source,ruby]
----
hbase(main):002:0> disable 'radish'
Took 0.5570 seconds
hbase(main):003:0> alter 'radish', {NAME => 'info', IN_MEMORY_COMPACTION => 'BASIC'}
Updating all regions with the new schema...
All regions updated.
Done.
Took 1.2413 seconds
hbase(main):004:0> describe 'radish'
Table radish is DISABLED
radish
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536', METADATA => {
'IN_MEMORY_COMPACTION' => 'BASIC'}}
1 row(s)
Took 0.0239 seconds
hbase(main):005:0> enable 'radish'
Took 0.7537 seconds
----
Note how the IN_MEMORY_COMPACTION attribute shows as part of the _METADATA_ map.
There is also a global configuration, _hbase.hregion.compacting.memstore.type_ which you can set in your _hbase-site.xml_ file. Use it to set the
default on creation of a new table (On creation of a column family Store, we look first to the column family configuration looking for the
_IN_MEMORY_COMPACTION_ setting, and if none, we then consult the _hbase.hregion.compacting.memstore.type_ value using its content; default is
_BASIC_).
By default, new hbase system tables will have _BASIC_ in-memory compaction set. To specify otherwise,
on new table-creation, set _hbase.hregion.compacting.memstore.type_ to _NONE_ (Note, setting this value
post-creation of system tables will not have a retroactive effect; you will have to alter your tables
to set the in-memory attribute to _NONE_).
When an in-memory flush happens is calculated by dividing the configured region flush size (Set in the table descriptor
or read from _hbase.hregion.memstore.flush.size_) by the number of column families and then multiplying by
_hbase.memstore.inmemoryflush.threshold.factor_ (default is 0.1).
The number of flushes carried by the pipeline is monitored so as to fit within the bounds of memstore sizing
but you can also set a maximum on the number of flushes total by setting
_hbase.hregion.compacting.pipeline.segments.limit_. Default is 4.
When a column family Store is created, it says what memstore type is in effect. As of this writing
there is the old-school _DefaultMemStore_ which fills a _ConcurrentSkipListMap_ and then flushes
to disk or the new _CompactingMemStore_ that is the implementation that provides this new
in-memory compactions facility. Here is a log-line from a RegionServer that shows a column
family Store named _family_ configured to use a _CompactingMemStore_:
----
Note how the IN_MEMORY_COMPACTION attribute shows as part of the _METADATA_ map.
2018-03-30 11:02:24,466 INFO [Time-limited test] regionserver.HStore(325): Store=family, memstore type=CompactingMemStore, storagePolicy=HOT, verifyBulkLoads=false, parallelPutCountPrintThreshold=10
----
Enable TRACE-level logging on the CompactingMemStore class (_org.apache.hadoop.hbase.regionserver.CompactingMemStore_) to see detail on its operation.

View File

@ -62,6 +62,7 @@ include::_chapters/mapreduce.adoc[]
include::_chapters/security.adoc[] include::_chapters/security.adoc[]
include::_chapters/architecture.adoc[] include::_chapters/architecture.adoc[]
include::_chapters/hbase_mob.adoc[] include::_chapters/hbase_mob.adoc[]
include::_chapters/inmemory_compaction.adoc[]
include::_chapters/backup_restore.adoc[] include::_chapters/backup_restore.adoc[]
include::_chapters/hbase_apis.adoc[] include::_chapters/hbase_apis.adoc[]
include::_chapters/external_apis.adoc[] include::_chapters/external_apis.adoc[]