HBASE-4241 Optimize flushing of the Memstore

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1161634 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-08-25 16:35:15 +00:00
parent 5f77151142
commit 7ede7e9b39
8 changed files with 181 additions and 141 deletions

View File

@ -403,6 +403,7 @@ Release 0.91.0 - Unreleased
HBASE-4199 blockCache summary - backend (Doug Meil)
HBASE-4240 Allow Loadbalancer to be pluggable
HBASE-4244 Refactor bin/hbase help
HBASE-4241 Optimize flushing of the Memstore (Lars Hofhansl)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
@ -90,6 +91,7 @@ public class Store implements HeapSize {
// ttl in milliseconds.
protected long ttl;
protected int minVersions;
protected int maxVersions;
long majorCompactionTime;
private final int minFilesToCompact;
private final int maxFilesToCompact;
@ -178,6 +180,7 @@ public class Store implements HeapSize {
this.ttl *= 1000;
}
this.minVersions = family.getMinVersions();
this.maxVersions = family.getMaxVersions();
this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = Bytes.toString(this.family.getName());
@ -481,7 +484,15 @@ public class Store implements HeapSize {
if (set.size() == 0) {
return null;
}
long oldestTimestamp = System.currentTimeMillis() - ttl;
Scan scan = new Scan();
scan.setMaxVersions(maxVersions);
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// pass true as the StoreScanner's retainDeletesInOutput argument.
InternalScanner scanner = new StoreScanner(this, scan,
Collections.singletonList(new CollectionBackedScanner(set,
this.comparator)), true);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
@ -491,15 +502,15 @@ public class Store implements HeapSize {
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
try {
for (KeyValue kv: set) {
// If minVersion > 0 we will wait until the next compaction to
// collect expired KVs. (following the logic for maxVersions).
// TODO: As Jonathan Gray points this can be optimized
// (see HBASE-4241)
if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {
List<KeyValue> kvs = new ArrayList<KeyValue>();
while (scanner.next(kvs)) {
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
kvs.clear();
}
}
} finally {
// Write out the log sequence number that corresponds to this output
@ -510,6 +521,9 @@ public class Store implements HeapSize {
writer.close();
}
}
} finally {
scanner.close();
}
// Write-out finished successfully, move into the right spot
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
@ -1734,7 +1748,7 @@ public class Store implements HeapSize {
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
(8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
(5 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
(6 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +

View File

@ -0,0 +1,129 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
/**
* Utility scanner that wraps a sortable collection and serves
* as a KeyValueScanner.
*/
public class CollectionBackedScanner implements KeyValueScanner {
final private Iterable<KeyValue> data;
final KeyValue.KVComparator comparator;
private Iterator<KeyValue> iter;
private KeyValue current;
public CollectionBackedScanner(SortedSet<KeyValue> set) {
this(set, KeyValue.COMPARATOR);
}
public CollectionBackedScanner(SortedSet<KeyValue> set,
KeyValue.KVComparator comparator) {
this.comparator = comparator;
data = set;
init();
}
public CollectionBackedScanner(List<KeyValue> list) {
this(list, KeyValue.COMPARATOR);
}
public CollectionBackedScanner(List<KeyValue> list,
KeyValue.KVComparator comparator) {
Collections.sort(list, comparator);
this.comparator = comparator;
data = list;
init();
}
public CollectionBackedScanner(KeyValue.KVComparator comparator,
KeyValue... array) {
this.comparator = comparator;
List<KeyValue> tmp = new ArrayList<KeyValue>(array.length);
for( int i = 0; i < array.length ; ++i) {
tmp.add(array[i]);
}
Collections.sort(tmp, comparator);
data = tmp;
init();
}
private void init() {
iter = data.iterator();
if(iter.hasNext()){
current = iter.next();
}
}
@Override
public KeyValue peek() {
return current;
}
@Override
public KeyValue next() {
KeyValue oldCurrent = current;
if(iter.hasNext()){
current = iter.next();
} else {
current = null;
}
return oldCurrent;
}
@Override
public boolean seek(KeyValue seekKv) {
// restart iterator
iter = data.iterator();
return reseek(seekKv);
}
@Override
public boolean reseek(KeyValue seekKv) {
while(iter.hasNext()){
KeyValue next = iter.next();
int ret = comparator.compare(next, seekKv);
if(ret >= 0){
current = next;
return true;
}
}
return false;
}
@Override
public long getSequenceID() {
return 0;
}
@Override
public void close() {
// do nothing
}
}

View File

@ -21,11 +21,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Collections;
import java.util.List;
/**
@ -34,21 +33,10 @@ import java.util.List;
* to the provided comparator, and then the whole thing pretends
* to be a store file scanner.
*/
public class KeyValueScanFixture implements KeyValueScanner {
ArrayList<KeyValue> data;
Iterator<KeyValue> iter = null;
KeyValue current = null;
KeyValue.KVComparator comparator;
public class KeyValueScanFixture extends CollectionBackedScanner {
public KeyValueScanFixture(KeyValue.KVComparator comparator,
KeyValue... incData) {
this.comparator = comparator;
data = new ArrayList<KeyValue>(incData.length);
for( int i = 0; i < incData.length ; ++i) {
data.add(incData[i]);
}
Collections.sort(data, this.comparator);
super(comparator, incData);
}
public static List<KeyValueScanner> scanFixture(KeyValue[] ... kvArrays) {
@ -58,54 +46,4 @@ public class KeyValueScanFixture implements KeyValueScanner {
}
return scanners;
}
@Override
public KeyValue peek() {
return this.current;
}
@Override
public KeyValue next() {
KeyValue res = current;
if (iter.hasNext())
current = iter.next();
else
current = null;
return res;
}
@Override
public boolean seek(KeyValue key) {
// start at beginning.
iter = data.iterator();
int cmp;
KeyValue kv = null;
do {
if (!iter.hasNext()) {
current = null;
return false;
}
kv = iter.next();
cmp = comparator.compare(key, kv);
} while (cmp > 0);
current = kv;
return true;
}
@Override
public boolean reseek(KeyValue key) {
return seek(key);
}
@Override
public void close() {
// noop.
}
@Override
public long getSequenceID() {
return 0;
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
public class TestKeyValueHeap extends HBaseTestCase {
@ -208,31 +209,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
}
}
private static class Scanner implements KeyValueScanner {
private Iterator<KeyValue> iter;
private KeyValue current;
private static class Scanner extends CollectionBackedScanner {
private boolean closed = false;
public Scanner(List<KeyValue> list) {
Collections.sort(list, KeyValue.COMPARATOR);
iter = list.iterator();
if(iter.hasNext()){
current = iter.next();
}
}
public KeyValue peek() {
return current;
}
public KeyValue next() {
KeyValue oldCurrent = current;
if(iter.hasNext()){
current = iter.next();
} else {
current = null;
}
return oldCurrent;
super(list);
}
public void close(){
@ -242,28 +223,6 @@ public class TestKeyValueHeap extends HBaseTestCase {
public boolean isClosed() {
return closed;
}
public boolean seek(KeyValue seekKv) {
while(iter.hasNext()){
KeyValue next = iter.next();
int ret = KeyValue.COMPARATOR.compare(next, seekKv);
if(ret >= 0){
current = next;
return true;
}
}
return false;
}
@Override
public boolean reseek(KeyValue key) throws IOException {
return seek(key);
}
@Override
public long getSequenceID() {
return 0;
}
}
}

View File

@ -42,8 +42,6 @@ public class TestKeyValueScanFixture extends TestCase {
KeyValueScanner scan = new KeyValueScanFixture(
KeyValue.COMPARATOR, kvs);
// test simple things.
assertNull(scan.peek());
KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA"));
// should seek to this:
assertTrue(scan.seek(kv));

View File

@ -315,14 +315,12 @@ public class TestMinVersions extends HBaseTestCase {
// now flush
region.flushcache();
region.compactStores();
// oldest version still exists
// flushing/minor compactions can't get rid of these, anymore
// with HBASE-4241 a flush will eliminate the expired rows
g = new Get(T1);
g.setTimeRange(0L, ts-2);
r = region.get(g, null);
checkResult(r, c0, T1);
assertTrue(r.isEmpty());
// major compaction
region.compactStores(true);

View File

@ -122,6 +122,9 @@ public class TestStore extends TestCase {
Path logdir = new Path(DIR+methodName+"/logs");
Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
HColumnDescriptor hcd = new HColumnDescriptor(family);
// some of the tests write 4 versions and then flush
// (with HBASE-4241, lower versions are collected on flush)
hcd.setMaxVersions(4);
FileSystem fs = FileSystem.get(conf);
fs.delete(logdir, true);