HBASE-7679 implement store file management for stripe compactions
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536567 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5a831da632
commit
19fd1a3310
|
@ -2345,10 +2345,25 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
||||||
private boolean matchingRows(final KeyValue left, final short lrowlength,
|
private boolean matchingRows(final KeyValue left, final short lrowlength,
|
||||||
final KeyValue right, final short rrowlength) {
|
final KeyValue right, final short rrowlength) {
|
||||||
return lrowlength == rrowlength &&
|
return lrowlength == rrowlength &&
|
||||||
Bytes.equals(left.getBuffer(), left.getRowOffset(), lrowlength,
|
matchingRows(left.getBuffer(), left.getRowOffset(), lrowlength,
|
||||||
right.getBuffer(), right.getRowOffset(), rrowlength);
|
right.getBuffer(), right.getRowOffset(), rrowlength);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
|
||||||
|
* @param left Left row array.
|
||||||
|
* @param loffset Left row offset.
|
||||||
|
* @param llength Left row length.
|
||||||
|
* @param right Right row array.
|
||||||
|
* @param roffset Right row offset.
|
||||||
|
* @param rlength Right row length.
|
||||||
|
* @return Whether rows are the same row.
|
||||||
|
*/
|
||||||
|
public boolean matchingRows(final byte [] left, final int loffset, final int llength,
|
||||||
|
final byte [] right, final int roffset, final int rlength) {
|
||||||
|
return Bytes.equals(left, loffset, llength, right, roffset, rlength);
|
||||||
|
}
|
||||||
|
|
||||||
public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
|
public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
|
||||||
byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
|
byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
|
||||||
if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
|
if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
|
||||||
|
|
|
@ -0,0 +1,166 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.lang.reflect.Array;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A collection class that contains multiple sub-lists, which allows us to not copy lists.
|
||||||
|
* This class does not support modification. The derived classes that add modifications are
|
||||||
|
* not thread-safe.
|
||||||
|
* NOTE: Doesn't implement list as it is not necessary for current usage, feel free to add.
|
||||||
|
*/
|
||||||
|
public class ConcatenatedLists<T> implements Collection<T> {
|
||||||
|
protected final ArrayList<List<T>> components = new ArrayList<List<T>>();
|
||||||
|
protected int size = 0;
|
||||||
|
|
||||||
|
public void addAllSublists(List<? extends List<T>> items) {
|
||||||
|
for (List<T> list : items) {
|
||||||
|
addSublist(list);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addSublist(List<T> items) {
|
||||||
|
if (!items.isEmpty()) {
|
||||||
|
this.components.add(items);
|
||||||
|
this.size += items.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return this.size;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return this.size == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(Object o) {
|
||||||
|
for (List<T> component : this.components) {
|
||||||
|
if (component.contains(o)) return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean containsAll(Collection<?> c) {
|
||||||
|
for (Object o : c) {
|
||||||
|
if (!contains(o)) return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] toArray() {
|
||||||
|
return toArray((Object[])Array.newInstance(Object.class, this.size));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public <U> U[] toArray(U[] a) {
|
||||||
|
U[] result = (a.length == this.size()) ? a
|
||||||
|
: (U[])Array.newInstance(a.getClass().getComponentType(), this.size);
|
||||||
|
int i = 0;
|
||||||
|
for (List<T> component : this.components) {
|
||||||
|
for (T t : component) {
|
||||||
|
result[i] = (U)t;
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(T e) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean remove(Object o) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addAll(Collection<? extends T> c) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeAll(Collection<?> c) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean retainAll(Collection<?> c) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public java.util.Iterator<T> iterator() {
|
||||||
|
return new Iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
public class Iterator implements java.util.Iterator<T> {
|
||||||
|
protected int currentComponent = 0;
|
||||||
|
protected int indexWithinComponent = -1;
|
||||||
|
protected boolean nextWasCalled = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return (currentComponent + 1) < components.size()
|
||||||
|
|| ((currentComponent + 1) == components.size()
|
||||||
|
&& ((indexWithinComponent + 1) < components.get(currentComponent).size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T next() {
|
||||||
|
if (!components.isEmpty()) {
|
||||||
|
this.nextWasCalled = true;
|
||||||
|
List<T> src = components.get(currentComponent);
|
||||||
|
if (++indexWithinComponent < src.size()) return src.get(indexWithinComponent);
|
||||||
|
if (++currentComponent < components.size()) {
|
||||||
|
indexWithinComponent = 0;
|
||||||
|
src = components.get(currentComponent);
|
||||||
|
assert src.size() > 0;
|
||||||
|
return src.get(indexWithinComponent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.nextWasCalled = false;
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,140 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestConcatenatedLists {
|
||||||
|
@Test
|
||||||
|
public void testUnsupportedOps() {
|
||||||
|
// If adding support, add tests.
|
||||||
|
ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
|
||||||
|
c.addSublist(Arrays.asList(0L, 1L));
|
||||||
|
try {
|
||||||
|
c.add(2L);
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
c.addAll(Arrays.asList(2L, 3L));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
c.remove(0L);
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
c.removeAll(Arrays.asList(0L, 1L));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
c.clear();
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
c.retainAll(Arrays.asList(0L, 1L));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
Iterator<Long> iter = c.iterator();
|
||||||
|
iter.next();
|
||||||
|
try {
|
||||||
|
iter.remove();
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (UnsupportedOperationException ex) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmpty() {
|
||||||
|
verify(new ConcatenatedLists<Long>(), -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneOne() {
|
||||||
|
ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
|
||||||
|
c.addSublist(Arrays.asList(0L));
|
||||||
|
verify(c, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneMany() {
|
||||||
|
ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
|
||||||
|
c.addSublist(Arrays.asList(0L, 1L, 2L));
|
||||||
|
verify(c, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testManyOne() {
|
||||||
|
ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
|
||||||
|
c.addSublist(Arrays.asList(0L));
|
||||||
|
c.addAllSublists(Arrays.asList(Arrays.asList(1L), Arrays.asList(2L)));
|
||||||
|
verify(c, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testManyMany() {
|
||||||
|
ConcatenatedLists<Long> c = new ConcatenatedLists<Long>();
|
||||||
|
c.addAllSublists(Arrays.asList(Arrays.asList(0L, 1L)));
|
||||||
|
c.addSublist(Arrays.asList(2L, 3L, 4L));
|
||||||
|
c.addAllSublists(Arrays.asList(Arrays.asList(5L), Arrays.asList(6L, 7L)));
|
||||||
|
verify(c, 7);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verify(ConcatenatedLists<Long> c, int last) {
|
||||||
|
assertEquals((last == -1), c.isEmpty());
|
||||||
|
assertEquals(last + 1, c.size());
|
||||||
|
assertTrue(c.containsAll(c));
|
||||||
|
Long[] array = c.toArray(new Long[0]);
|
||||||
|
List<Long> all = new ArrayList<Long>();
|
||||||
|
Iterator<Long> iter = c.iterator();
|
||||||
|
for (Long i = 0L; i <= last; ++i) {
|
||||||
|
assertTrue(iter.hasNext());
|
||||||
|
assertEquals(i, iter.next());
|
||||||
|
assertEquals(i, array[i.intValue()]);
|
||||||
|
assertTrue(c.contains(i));
|
||||||
|
assertTrue(c.containsAll(Arrays.asList(i)));
|
||||||
|
all.add(i);
|
||||||
|
}
|
||||||
|
assertTrue(c.containsAll(all));
|
||||||
|
assertFalse(iter.hasNext());
|
||||||
|
try {
|
||||||
|
iter.next();
|
||||||
|
fail("Should have thrown");
|
||||||
|
} catch (NoSuchElementException nsee) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -285,6 +285,10 @@ public class StoreFile {
|
||||||
return modificationTimeStamp;
|
return modificationTimeStamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byte[] getMetadataValue(byte[] key) {
|
||||||
|
return metadataMap.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the largest memstoreTS found across all storefiles in
|
* Return the largest memstoreTS found across all storefiles in
|
||||||
* the given list. Store files that were created by a mapreduce
|
* the given list. Store files that were created by a mapreduce
|
||||||
|
|
|
@ -58,8 +58,7 @@ public interface StoreFileManager {
|
||||||
* @param results The resulting files for the compaction.
|
* @param results The resulting files for the compaction.
|
||||||
*/
|
*/
|
||||||
void addCompactionResults(
|
void addCompactionResults(
|
||||||
Collection<StoreFile> compactedFiles, Collection<StoreFile> results
|
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
|
||||||
);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears all the files currently in use and returns them.
|
* Clears all the files currently in use and returns them.
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration class for stripe store and compactions.
|
||||||
|
* See {@link StripeStoreFileManager} for general documentation.
|
||||||
|
* See getters for the description of each setting.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class StripeStoreConfig {
|
||||||
|
public static final String MAX_SPLIT_IMBALANCE = "hbase.store.stripe.split.max.imbalance";
|
||||||
|
private float maxSplitImbalance;
|
||||||
|
|
||||||
|
public StripeStoreConfig(Configuration config) {
|
||||||
|
maxSplitImbalance = config.getFloat(MAX_SPLIT_IMBALANCE, 1.5f);
|
||||||
|
if (maxSplitImbalance == 0) {
|
||||||
|
maxSplitImbalance = 1.5f;
|
||||||
|
}
|
||||||
|
if (maxSplitImbalance < 1f) {
|
||||||
|
maxSplitImbalance = 1f / maxSplitImbalance;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the maximum imbalance to tolerate between sides when splitting the region
|
||||||
|
* at the stripe boundary. If the ratio of a larger to a smaller side of the split on
|
||||||
|
* the stripe-boundary is bigger than this, then some stripe will be split.
|
||||||
|
*/
|
||||||
|
public float getMaxSplitImbalance() {
|
||||||
|
return this.maxSplitImbalance;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,862 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableCollection;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stripe implementation of StoreFileManager.
|
||||||
|
* Not thread safe - relies on external locking (in HStore). Collections that this class
|
||||||
|
* returns are immutable or unique to the call, so they should be safe.
|
||||||
|
* Stripe store splits the key space of the region into non-overlapping stripes, as well as
|
||||||
|
* some recent files that have all the keys (level 0). Each stripe contains a set of files.
|
||||||
|
* When L0 is compacted, it's split into the files corresponding to existing stripe boundaries,
|
||||||
|
* that can thus be added to stripes.
|
||||||
|
* When scan or get happens, it only has to read the files from the corresponding stripes.
|
||||||
|
* See StripeCompationPolicy on how the stripes are determined; this class doesn't care.
|
||||||
|
*
|
||||||
|
* This class should work together with StripeCompactionPolicy and StripeCompactor.
|
||||||
|
* With regard to how they work, we make at least the following (reasonable) assumptions:
|
||||||
|
* - Compaction produces one file per new stripe (if any); that is easy to change.
|
||||||
|
* - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class StripeStoreFileManager implements StoreFileManager {
|
||||||
|
static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The file metadata fields that contain the stripe information.
|
||||||
|
*/
|
||||||
|
public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
|
||||||
|
public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
|
||||||
|
|
||||||
|
private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The key value used for range boundary, indicating that the boundary is open (i.e. +-inf).
|
||||||
|
*/
|
||||||
|
public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
|
||||||
|
final static byte[] INVALID_KEY = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The state class. Used solely to replace results atomically during
|
||||||
|
* compactions and avoid complicated error handling.
|
||||||
|
*/
|
||||||
|
private static class State {
|
||||||
|
/**
|
||||||
|
* The end keys of each stripe. The last stripe end is always open-ended, so it's not stored
|
||||||
|
* here. It is invariant that the start key of the stripe is the end key of the previous one
|
||||||
|
* (and is an open boundary for the first one).
|
||||||
|
*/
|
||||||
|
public byte[][] stripeEndRows = new byte[0][];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Files by stripe. Each element of the list corresponds to stripeEndKey with the corresponding
|
||||||
|
* index, except the last one. Inside each list, the files are in reverse order by seqNum.
|
||||||
|
* Note that the length of this is one higher than that of stripeEndKeys.
|
||||||
|
*/
|
||||||
|
public ArrayList<ImmutableList<StoreFile>> stripeFiles
|
||||||
|
= new ArrayList<ImmutableList<StoreFile>>();
|
||||||
|
/** Level 0. The files are in reverse order by seqNum. */
|
||||||
|
public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
|
||||||
|
|
||||||
|
/** Cached list of all files in the structure, to return from some calls */
|
||||||
|
public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
|
||||||
|
}
|
||||||
|
private State state = null;
|
||||||
|
|
||||||
|
/** Cached file metadata (or overrides as the case may be) */
|
||||||
|
private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
|
||||||
|
private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
|
||||||
|
|
||||||
|
private final KVComparator kvComparator;
|
||||||
|
private StripeStoreConfig config;
|
||||||
|
|
||||||
|
private final int blockingFileCount;
|
||||||
|
|
||||||
|
public StripeStoreFileManager(KVComparator kvComparator, Configuration conf) throws Exception {
|
||||||
|
this.kvComparator = kvComparator;
|
||||||
|
// TODO: create this in a shared manner in StoreEngine when there's one
|
||||||
|
this.config = new StripeStoreConfig(conf);
|
||||||
|
this.blockingFileCount = conf.getInt(
|
||||||
|
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void loadFiles(List<StoreFile> storeFiles) {
|
||||||
|
loadUnclassifiedStoreFiles(storeFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<StoreFile> getStorefiles() {
|
||||||
|
return state.allFilesCached;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void insertNewFile(StoreFile sf) {
|
||||||
|
LOG.debug("New level 0 file: " + sf);
|
||||||
|
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(state.level0Files);
|
||||||
|
insertFileIntoStripe(newFiles, sf);
|
||||||
|
ensureLevel0Metadata(sf);
|
||||||
|
this.state.level0Files = ImmutableList.copyOf(newFiles);
|
||||||
|
ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(state.allFilesCached);
|
||||||
|
newAllFiles.add(sf);
|
||||||
|
this.state.allFilesCached = ImmutableList.copyOf(newAllFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImmutableCollection<StoreFile> clearFiles() {
|
||||||
|
ImmutableCollection<StoreFile> result = state.allFilesCached;
|
||||||
|
this.state = new State();
|
||||||
|
this.fileStarts.clear();
|
||||||
|
this.fileEnds.clear();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getStorefileCount() {
|
||||||
|
return state.allFilesCached.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)}
|
||||||
|
* for details on this methods. */
|
||||||
|
@Override
|
||||||
|
public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
|
||||||
|
KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
|
||||||
|
// Order matters for this call.
|
||||||
|
result.addSublist(state.level0Files);
|
||||||
|
if (!state.stripeFiles.isEmpty()) {
|
||||||
|
int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
|
||||||
|
for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
|
||||||
|
result.addSublist(state.stripeFiles.get(stripeIndex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** See {@link StoreFileManager#getCandidateFilesForRowKeyBefore(KeyValue)} and
|
||||||
|
* {@link StoreFileManager#updateCandidateFilesForRowKeyBefore(Iterator, KeyValue, KeyValue)}
|
||||||
|
* for details on this methods. */
|
||||||
|
@Override
|
||||||
|
public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
|
||||||
|
Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final KeyValue candidate) {
|
||||||
|
KeyBeforeConcatenatedLists.Iterator original =
|
||||||
|
(KeyBeforeConcatenatedLists.Iterator)candidateFiles;
|
||||||
|
assert original != null;
|
||||||
|
ArrayList<List<StoreFile>> components = original.getComponents();
|
||||||
|
for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
|
||||||
|
StoreFile sf = components.get(firstIrrelevant).get(0);
|
||||||
|
byte[] endKey = endOf(sf);
|
||||||
|
// Entries are ordered as such: L0, then stripes in reverse order. We never remove
|
||||||
|
// level 0; we remove the stripe, and all subsequent ones, as soon as we find the
|
||||||
|
// first one that cannot possibly have better candidates.
|
||||||
|
if (!isInvalid(endKey) && !isOpen(endKey)
|
||||||
|
&& (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
|
||||||
|
original.removeComponents(firstIrrelevant);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return original;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
/**
|
||||||
|
* Override of getSplitPoint that determines the split point as the boundary between two
|
||||||
|
* stripes, unless it causes significant imbalance between split sides' sizes. In that
|
||||||
|
* case, the split boundary will be chosen from the middle of one of the stripes to
|
||||||
|
* minimize imbalance.
|
||||||
|
* @return The split point, or null if no split is possible.
|
||||||
|
*/
|
||||||
|
public byte[] getSplitPoint() throws IOException {
|
||||||
|
if (this.getStorefileCount() == 0) return null;
|
||||||
|
if (state.stripeFiles.size() <= 1) {
|
||||||
|
return getSplitPointFromAllFiles();
|
||||||
|
}
|
||||||
|
int leftIndex = -1, rightIndex = state.stripeFiles.size();
|
||||||
|
long leftSize = 0, rightSize = 0;
|
||||||
|
long lastLeftSize = 0, lastRightSize = 0;
|
||||||
|
while (rightIndex - 1 != leftIndex) {
|
||||||
|
if (leftSize >= rightSize) {
|
||||||
|
--rightIndex;
|
||||||
|
lastRightSize = getStripeFilesSize(rightIndex);
|
||||||
|
rightSize += lastRightSize;
|
||||||
|
} else {
|
||||||
|
++leftIndex;
|
||||||
|
lastLeftSize = getStripeFilesSize(leftIndex);
|
||||||
|
leftSize += lastLeftSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (leftSize == 0 || rightSize == 0) {
|
||||||
|
String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
|
||||||
|
+ "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
|
||||||
|
debugDumpState(errMsg);
|
||||||
|
LOG.warn(errMsg);
|
||||||
|
return getSplitPointFromAllFiles();
|
||||||
|
}
|
||||||
|
double ratio = (double)rightSize / leftSize;
|
||||||
|
if (ratio < 1) {
|
||||||
|
ratio = 1 / ratio;
|
||||||
|
}
|
||||||
|
if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
|
||||||
|
|
||||||
|
// If the difference between the sides is too large, we could get the proportional key on
|
||||||
|
// the a stripe to equalize the difference, but there's no proportional key method at the
|
||||||
|
// moment, and it's not extremely important.
|
||||||
|
// See if we can achieve better ratio if we split the bigger side in half.
|
||||||
|
boolean isRightLarger = rightSize >= leftSize;
|
||||||
|
double newRatio = isRightLarger
|
||||||
|
? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
|
||||||
|
: getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
|
||||||
|
if (newRatio < 1) {
|
||||||
|
newRatio = 1 / newRatio;
|
||||||
|
}
|
||||||
|
if (newRatio >= ratio) return state.stripeEndRows[leftIndex];
|
||||||
|
LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
|
||||||
|
+ newRatio + " configured ratio " + config.getMaxSplitImbalance());
|
||||||
|
// Ok, we may get better ratio, get it.
|
||||||
|
return StoreUtils.getLargestFile(state.stripeFiles.get(
|
||||||
|
isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getSplitPointFromAllFiles() throws IOException {
|
||||||
|
ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
|
||||||
|
sfs.addSublist(state.level0Files);
|
||||||
|
sfs.addAllSublists(state.stripeFiles);
|
||||||
|
if (sfs.isEmpty()) return null;
|
||||||
|
return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
|
||||||
|
}
|
||||||
|
|
||||||
|
private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
|
||||||
|
return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<StoreFile> getFilesForScanOrGet(
|
||||||
|
boolean isGet, byte[] startRow, byte[] stopRow) {
|
||||||
|
if (state.stripeFiles.isEmpty()) {
|
||||||
|
return state.level0Files; // There's just L0.
|
||||||
|
}
|
||||||
|
|
||||||
|
int firstStripe = findStripeForRow(startRow, true);
|
||||||
|
int lastStripe = findStripeForRow(stopRow, false);
|
||||||
|
assert firstStripe <= lastStripe;
|
||||||
|
if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
|
||||||
|
return state.stripeFiles.get(firstStripe); // There's just one stripe we need.
|
||||||
|
}
|
||||||
|
if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
|
||||||
|
return state.allFilesCached; // We need to read all files.
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
|
||||||
|
result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
|
||||||
|
result.addSublist(state.level0Files);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addCompactionResults(
|
||||||
|
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
|
||||||
|
// See class comment for the assumptions we make here.
|
||||||
|
LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
|
||||||
|
+ " files replaced by " + results.size());
|
||||||
|
// In order to be able to fail in the middle of the operation, we'll operate on lazy
|
||||||
|
// copies and apply the result at the end.
|
||||||
|
CompactionResultsMergeCopy cmc = new CompactionResultsMergeCopy();
|
||||||
|
cmc.mergeResults(compactedFiles, results);
|
||||||
|
debugDumpState("Merged compaction results");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getStoreCompactionPriority() {
|
||||||
|
// If there's only L0, do what the default store does.
|
||||||
|
// If we are in critical priority, do the same - we don't want to trump all stores all
|
||||||
|
// the time due to how many files we have.
|
||||||
|
int fc = getStorefileCount();
|
||||||
|
if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
|
||||||
|
return this.blockingFileCount - fc;
|
||||||
|
}
|
||||||
|
// If we are in good shape, we don't want to be trumped by all other stores due to how
|
||||||
|
// many files we have, so do an approximate mapping to normal priority range; L0 counts
|
||||||
|
// for all stripes.
|
||||||
|
int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
|
||||||
|
int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
|
||||||
|
return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the total size of all files in the stripe.
|
||||||
|
* @param stripeIndex Stripe index.
|
||||||
|
* @return Size.
|
||||||
|
*/
|
||||||
|
private long getStripeFilesSize(int stripeIndex) {
|
||||||
|
long result = 0;
|
||||||
|
for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
|
||||||
|
result += sf.getReader().length();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Loads initial store files that were picked up from some physical location pertaining to
|
||||||
|
* this store (presumably). Unlike adding files after compaction, assumes empty initial
|
||||||
|
* sets, and is forgiving with regard to stripe constraints - at worst, many/all files will
|
||||||
|
* go to level 0.
|
||||||
|
* @param storeFiles Store files to add.
|
||||||
|
*/
|
||||||
|
private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
|
||||||
|
LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
|
||||||
|
TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
|
||||||
|
new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
|
||||||
|
ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
|
||||||
|
// Separate the files into tentative stripes; then validate. Currently, we rely on metadata.
|
||||||
|
// If needed, we could dynamically determine the stripes in future.
|
||||||
|
for (StoreFile sf : storeFiles) {
|
||||||
|
byte[] startRow = startOf(sf), endRow = endOf(sf);
|
||||||
|
// Validate the range and put the files into place.
|
||||||
|
if (isInvalid(startRow) || isInvalid(endRow)) {
|
||||||
|
insertFileIntoStripe(level0Files, sf); // No metadata - goes to L0.
|
||||||
|
ensureLevel0Metadata(sf);
|
||||||
|
} else if (!isOpen(startRow) && !isOpen(endRow) &&
|
||||||
|
nonOpenRowCompare(startRow, endRow) >= 0) {
|
||||||
|
LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
|
||||||
|
+ Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
|
||||||
|
insertFileIntoStripe(level0Files, sf); // Bad metadata - goes to L0 also.
|
||||||
|
ensureLevel0Metadata(sf);
|
||||||
|
} else {
|
||||||
|
ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
|
||||||
|
if (stripe == null) {
|
||||||
|
stripe = new ArrayList<StoreFile>();
|
||||||
|
candidateStripes.put(endRow, stripe);
|
||||||
|
}
|
||||||
|
insertFileIntoStripe(stripe, sf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Possible improvement - for variable-count stripes, if all the files are in L0, we can
|
||||||
|
// instead create single, open-ended stripe with all files.
|
||||||
|
|
||||||
|
boolean hasOverlaps = false;
|
||||||
|
byte[] expectedStartRow = null; // first stripe can start wherever
|
||||||
|
Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
|
||||||
|
candidateStripes.entrySet().iterator();
|
||||||
|
while (entryIter.hasNext()) {
|
||||||
|
Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
|
||||||
|
ArrayList<StoreFile> files = entry.getValue();
|
||||||
|
// Validate the file start keys, and remove the bad ones to level 0.
|
||||||
|
for (int i = 0; i < files.size(); ++i) {
|
||||||
|
StoreFile sf = files.get(i);
|
||||||
|
byte[] startRow = startOf(sf);
|
||||||
|
if (expectedStartRow == null) {
|
||||||
|
expectedStartRow = startRow; // ensure that first stripe is still consistent
|
||||||
|
} else if (!rowEquals(expectedStartRow, startRow)) {
|
||||||
|
hasOverlaps = true;
|
||||||
|
LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
|
||||||
|
+ Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
|
||||||
|
+ "], to L0 it goes");
|
||||||
|
StoreFile badSf = files.remove(i);
|
||||||
|
insertFileIntoStripe(level0Files, badSf);
|
||||||
|
ensureLevel0Metadata(badSf);
|
||||||
|
--i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check if any files from the candidate stripe are valid. If so, add a stripe.
|
||||||
|
byte[] endRow = entry.getKey();
|
||||||
|
if (!files.isEmpty()) {
|
||||||
|
expectedStartRow = endRow; // Next stripe must start exactly at that key.
|
||||||
|
} else {
|
||||||
|
entryIter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// In the end, there must be open ends on two sides. If not, and there were no errors i.e.
|
||||||
|
// files are consistent, they might be coming from a split. We will treat the boundaries
|
||||||
|
// as open keys anyway, and log the message.
|
||||||
|
// If there were errors, we'll play it safe and dump everything into L0.
|
||||||
|
if (!candidateStripes.isEmpty()) {
|
||||||
|
StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
|
||||||
|
boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
|
||||||
|
if (!isOpen) {
|
||||||
|
LOG.warn("The range of the loaded files does not cover full key space: from ["
|
||||||
|
+ Bytes.toString(startOf(firstFile)) + "], to ["
|
||||||
|
+ Bytes.toString(candidateStripes.lastKey()) + "]");
|
||||||
|
if (!hasOverlaps) {
|
||||||
|
ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
|
||||||
|
ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Inconsistent files, everything goes to L0.");
|
||||||
|
for (ArrayList<StoreFile> files : candidateStripes.values()) {
|
||||||
|
for (StoreFile sf : files) {
|
||||||
|
insertFileIntoStripe(level0Files, sf);
|
||||||
|
ensureLevel0Metadata(sf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
candidateStripes.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the results into the fields.
|
||||||
|
State state = new State();
|
||||||
|
state.level0Files = ImmutableList.copyOf(level0Files);
|
||||||
|
state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
|
||||||
|
state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
|
||||||
|
ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
|
||||||
|
int i = candidateStripes.size() - 1;
|
||||||
|
for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
|
||||||
|
state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
|
||||||
|
newAllFiles.addAll(entry.getValue());
|
||||||
|
if (i > 0) {
|
||||||
|
state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
|
||||||
|
}
|
||||||
|
--i;
|
||||||
|
}
|
||||||
|
state.allFilesCached = ImmutableList.copyOf(newAllFiles);
|
||||||
|
this.state = state;
|
||||||
|
debugDumpState("Files loaded");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
|
||||||
|
HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
|
||||||
|
for (StoreFile sf : stripe) {
|
||||||
|
targetMap.put(sf, OPEN_KEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ensureLevel0Metadata(StoreFile sf) {
|
||||||
|
if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, null);
|
||||||
|
if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For testing.
|
||||||
|
*/
|
||||||
|
List<StoreFile> getLevel0Files() {
|
||||||
|
return state.level0Files;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void debugDumpState(String string) {
|
||||||
|
if (!LOG.isDebugEnabled()) return;
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("\n" + string + "; current stripe state is as such:");
|
||||||
|
sb.append("\n level 0 with ").append(state.level0Files.size()).append(" files;");
|
||||||
|
for (int i = 0; i < state.stripeFiles.size(); ++i) {
|
||||||
|
String endRow = (i == state.stripeEndRows.length)
|
||||||
|
? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
|
||||||
|
sb.append("\n stripe ending in ").append(endRow).append(" with ")
|
||||||
|
.append(state.stripeFiles.get(i).size()).append(" files;");
|
||||||
|
}
|
||||||
|
sb.append("\n").append(getStorefileCount()).append(" files total.");
|
||||||
|
LOG.debug(sb.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether the key indicates an open interval boundary (i.e. infinity).
|
||||||
|
*/
|
||||||
|
private static final boolean isOpen(byte[] key) {
|
||||||
|
return key != null && key.length == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether the key is invalid (e.g. from an L0 file, or non-stripe-compacted files).
|
||||||
|
*/
|
||||||
|
private static final boolean isInvalid(byte[] key) {
|
||||||
|
return key == INVALID_KEY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare two keys for equality.
|
||||||
|
*/
|
||||||
|
private final boolean rowEquals(byte[] k1, byte[] k2) {
|
||||||
|
return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare two keys. Keys must not be open (isOpen(row) == false).
|
||||||
|
*/
|
||||||
|
private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
|
||||||
|
assert !isOpen(k1) && !isOpen(k2);
|
||||||
|
return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the stripe index by end key.
|
||||||
|
*/
|
||||||
|
private final int findStripeIndexByEndRow(byte[] endRow) {
|
||||||
|
assert !isInvalid(endRow);
|
||||||
|
if (isOpen(endRow)) return state.stripeEndRows.length;
|
||||||
|
return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the stripe index for the stripe containing a key provided externally for get/scan.
|
||||||
|
*/
|
||||||
|
private final int findStripeForRow(byte[] row, boolean isStart) {
|
||||||
|
if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
|
||||||
|
if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
|
||||||
|
// If there's an exact match below, a stripe ends at "row". Stripe right boundary is
|
||||||
|
// exclusive, so that means the row is in the next stripe; thus, we need to add one to index.
|
||||||
|
// If there's no match, the return value of binarySearch is (-(insertion point) - 1), where
|
||||||
|
// insertion point is the index of the next greater element, or list size if none. The
|
||||||
|
// insertion point happens to be exactly what we need, so we need to add one to the result.
|
||||||
|
return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the start key for a given stripe.
|
||||||
|
* @param stripeIndex Stripe index.
|
||||||
|
* @return Start key. May be an open key.
|
||||||
|
*/
|
||||||
|
public final byte[] getStartRow(int stripeIndex) {
|
||||||
|
return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the start key for a given stripe.
|
||||||
|
* @param stripeIndex Stripe index.
|
||||||
|
* @return Start key. May be an open key.
|
||||||
|
*/
|
||||||
|
public final byte[] getEndRow(int stripeIndex) {
|
||||||
|
return (stripeIndex == state.stripeEndRows.length
|
||||||
|
? OPEN_KEY : state.stripeEndRows[stripeIndex]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] startOf(StoreFile sf) {
|
||||||
|
byte[] result = this.fileStarts.get(sf);
|
||||||
|
return result != null ? result : sf.getMetadataValue(STRIPE_START_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] endOf(StoreFile sf) {
|
||||||
|
byte[] result = this.fileEnds.get(sf);
|
||||||
|
return result != null ? result : sf.getMetadataValue(STRIPE_END_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inserts a file in the correct place (by seqnum) in a stripe copy.
|
||||||
|
* @param stripe Stripe copy to insert into.
|
||||||
|
* @param sf File to insert.
|
||||||
|
*/
|
||||||
|
private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
|
||||||
|
// The only operation for which sorting of the files matters is KeyBefore. Therefore,
|
||||||
|
// we will store the file in reverse order by seqNum from the outset.
|
||||||
|
for (int insertBefore = 0; ; ++insertBefore) {
|
||||||
|
if (insertBefore == stripe.size()
|
||||||
|
|| (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
|
||||||
|
stripe.add(insertBefore, sf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An extension of ConcatenatedLists that has several peculiar properties.
|
||||||
|
* First, one can cut the tail of the logical list by removing last several sub-lists.
|
||||||
|
* Second, items can be removed thru iterator.
|
||||||
|
* Third, if the sub-lists are immutable, they are replaced with mutable copies when needed.
|
||||||
|
* On average KeyBefore operation will contain half the stripes as potential candidates,
|
||||||
|
* but will quickly cut down on them as it finds something in the more likely ones; thus,
|
||||||
|
* the above allow us to avoid unnecessary copying of a bunch of lists.
|
||||||
|
*/
|
||||||
|
private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
|
||||||
|
@Override
|
||||||
|
public java.util.Iterator<StoreFile> iterator() {
|
||||||
|
return new Iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
|
||||||
|
public ArrayList<List<StoreFile>> getComponents() {
|
||||||
|
return components;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeComponents(int startIndex) {
|
||||||
|
List<List<StoreFile>> subList = components.subList(startIndex, components.size());
|
||||||
|
for (List<StoreFile> entry : subList) {
|
||||||
|
size -= entry.size();
|
||||||
|
}
|
||||||
|
assert size >= 0;
|
||||||
|
subList.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
if (!this.nextWasCalled) {
|
||||||
|
throw new IllegalStateException("No element to remove");
|
||||||
|
}
|
||||||
|
this.nextWasCalled = false;
|
||||||
|
List<StoreFile> src = components.get(currentComponent);
|
||||||
|
if (src instanceof ImmutableList<?>) {
|
||||||
|
src = new ArrayList<StoreFile>(src);
|
||||||
|
components.set(currentComponent, src);
|
||||||
|
}
|
||||||
|
src.remove(indexWithinComponent);
|
||||||
|
--size;
|
||||||
|
--indexWithinComponent;
|
||||||
|
if (src.isEmpty()) {
|
||||||
|
components.remove(currentComponent); // indexWithinComponent is already -1 here.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Non-static helper class for merging compaction results.
|
||||||
|
* Since we want to merge them atomically (more or less), it operates on lazy copies, and
|
||||||
|
* then applies copies to real lists as necessary.
|
||||||
|
*/
|
||||||
|
private class CompactionResultsMergeCopy {
|
||||||
|
private ArrayList<List<StoreFile>> stripeFiles = null;
|
||||||
|
private ArrayList<StoreFile> level0Files = null;
|
||||||
|
private ArrayList<byte[]> stripeEndRows = null;
|
||||||
|
|
||||||
|
private Collection<StoreFile> compactedFiles = null;
|
||||||
|
private Collection<StoreFile> results = null;
|
||||||
|
|
||||||
|
private List<StoreFile> l0Results = new ArrayList<StoreFile>();
|
||||||
|
|
||||||
|
public CompactionResultsMergeCopy() {
|
||||||
|
// Create a lazy mutable copy (other fields are so lazy they start out as nulls).
|
||||||
|
this.stripeFiles = new ArrayList<List<StoreFile>>(
|
||||||
|
StripeStoreFileManager.this.state.stripeFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
|
||||||
|
throws IOException {
|
||||||
|
assert this.compactedFiles == null && this.results == null;
|
||||||
|
this.compactedFiles = compactedFiles;
|
||||||
|
this.results = results;
|
||||||
|
// Do logical processing.
|
||||||
|
removeCompactedFiles();
|
||||||
|
TreeMap<byte[], StoreFile> newStripes = processCompactionResults();
|
||||||
|
if (newStripes != null) {
|
||||||
|
processNewCandidateStripes(newStripes);
|
||||||
|
}
|
||||||
|
// Create new state and update parent.
|
||||||
|
State state = createNewState();
|
||||||
|
StripeStoreFileManager.this.state = state;
|
||||||
|
updateMetadataMaps();
|
||||||
|
}
|
||||||
|
|
||||||
|
private State createNewState() {
|
||||||
|
State oldState = StripeStoreFileManager.this.state;
|
||||||
|
// Stripe count should be the same unless the end rows changed.
|
||||||
|
assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
|
||||||
|
State newState = new State();
|
||||||
|
newState.level0Files = (this.level0Files == null) ? oldState.level0Files
|
||||||
|
: ImmutableList.copyOf(this.level0Files);
|
||||||
|
newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
|
||||||
|
: this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
|
||||||
|
newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
|
||||||
|
for (List<StoreFile> newStripe : this.stripeFiles) {
|
||||||
|
newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
|
||||||
|
? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
|
||||||
|
newAllFiles.removeAll(compactedFiles);
|
||||||
|
newAllFiles.addAll(results);
|
||||||
|
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
|
||||||
|
return newState;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateMetadataMaps() {
|
||||||
|
StripeStoreFileManager parent = StripeStoreFileManager.this;
|
||||||
|
for (StoreFile sf : this.compactedFiles) {
|
||||||
|
parent.fileStarts.remove(sf);
|
||||||
|
parent.fileEnds.remove(sf);
|
||||||
|
}
|
||||||
|
for (StoreFile sf : this.l0Results) {
|
||||||
|
parent.ensureLevel0Metadata(sf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param index Index of the stripe we need.
|
||||||
|
* @return A lazy stripe copy from current stripes.
|
||||||
|
*/
|
||||||
|
private final ArrayList<StoreFile> getStripeCopy(int index) {
|
||||||
|
List<StoreFile> stripeCopy = this.stripeFiles.get(index);
|
||||||
|
ArrayList<StoreFile> result = null;
|
||||||
|
if (stripeCopy instanceof ImmutableList<?>) {
|
||||||
|
result = new ArrayList<StoreFile>(stripeCopy);
|
||||||
|
this.stripeFiles.set(index, result);
|
||||||
|
} else {
|
||||||
|
result = (ArrayList<StoreFile>)stripeCopy;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return A lazy L0 copy from current state.
|
||||||
|
*/
|
||||||
|
private final ArrayList<StoreFile> getLevel0Copy() {
|
||||||
|
if (this.level0Files == null) {
|
||||||
|
this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
|
||||||
|
}
|
||||||
|
return this.level0Files;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process new files, and add them either to the structure of existing stripes,
|
||||||
|
* or to the list of new candidate stripes.
|
||||||
|
* @return New candidate stripes.
|
||||||
|
*/
|
||||||
|
private TreeMap<byte[], StoreFile> processCompactionResults() throws IOException {
|
||||||
|
TreeMap<byte[], StoreFile> newStripes = null;
|
||||||
|
for (StoreFile sf : this.results) {
|
||||||
|
byte[] startRow = startOf(sf), endRow = endOf(sf);
|
||||||
|
if (isInvalid(endRow) || isInvalid(startRow)) {
|
||||||
|
LOG.warn("The newly compacted files doesn't have stripe rows set: " + sf.getPath());
|
||||||
|
insertFileIntoStripe(getLevel0Copy(), sf);
|
||||||
|
this.l0Results.add(sf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!this.stripeFiles.isEmpty()) {
|
||||||
|
int stripeIndex = findStripeIndexByEndRow(endRow);
|
||||||
|
if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
|
||||||
|
// Simple/common case - add file to an existing stripe.
|
||||||
|
insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new candidate stripe.
|
||||||
|
if (newStripes == null) {
|
||||||
|
newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
|
||||||
|
}
|
||||||
|
StoreFile oldSf = newStripes.put(endRow, sf);
|
||||||
|
if (oldSf != null) {
|
||||||
|
throw new IOException("Compactor has produced multiple files for the stripe ending in ["
|
||||||
|
+ Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newStripes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove compacted files.
|
||||||
|
* @param compactedFiles Compacted files.
|
||||||
|
*/
|
||||||
|
private void removeCompactedFiles() throws IOException {
|
||||||
|
for (StoreFile oldFile : this.compactedFiles) {
|
||||||
|
byte[] oldEndRow = endOf(oldFile);
|
||||||
|
List<StoreFile> source = null;
|
||||||
|
if (isInvalid(oldEndRow)) {
|
||||||
|
source = getLevel0Copy();
|
||||||
|
} else {
|
||||||
|
int stripeIndex = findStripeIndexByEndRow(oldEndRow);
|
||||||
|
if (stripeIndex < 0) {
|
||||||
|
throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
|
||||||
|
+ " to a known stripe (end row + [" + Bytes.toString(oldEndRow) + "])");
|
||||||
|
}
|
||||||
|
source = getStripeCopy(stripeIndex);
|
||||||
|
}
|
||||||
|
if (!source.remove(oldFile)) {
|
||||||
|
throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
|
||||||
|
* new candidate stripes/removes old stripes; produces new set of stripe end keys.
|
||||||
|
* @param newStripes New stripes - files by end key.
|
||||||
|
*/
|
||||||
|
private void processNewCandidateStripes(
|
||||||
|
TreeMap<byte[], StoreFile> newStripes) throws IOException {
|
||||||
|
// Validate that the removed and added aggregate ranges still make for a full key space.
|
||||||
|
boolean hasStripes = !this.stripeFiles.isEmpty();
|
||||||
|
this.stripeEndRows = new ArrayList<byte[]>(
|
||||||
|
Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
|
||||||
|
int removeFrom = 0;
|
||||||
|
byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
|
||||||
|
byte[] lastEndRow = newStripes.lastKey();
|
||||||
|
if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
|
||||||
|
throw new IOException("Newly created stripes do not cover the entire key space.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasStripes) {
|
||||||
|
// Determine which stripes will need to be removed because they conflict with new stripes.
|
||||||
|
// The new boundaries should match old stripe boundaries, so we should get exact matches.
|
||||||
|
if (isOpen(firstStartRow)) {
|
||||||
|
removeFrom = 0;
|
||||||
|
} else {
|
||||||
|
removeFrom = findStripeIndexByEndRow(firstStartRow);
|
||||||
|
if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
|
||||||
|
++removeFrom;
|
||||||
|
}
|
||||||
|
int removeTo = findStripeIndexByEndRow(lastEndRow);
|
||||||
|
if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
|
||||||
|
// Remove old empty stripes.
|
||||||
|
int originalCount = this.stripeFiles.size();
|
||||||
|
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
|
||||||
|
if (!this.stripeFiles.get(removeIndex).isEmpty()) {
|
||||||
|
throw new IOException("Compaction intends to create a new stripe that replaces an"
|
||||||
|
+ " existing one, but the latter contains some files.");
|
||||||
|
}
|
||||||
|
if (removeIndex != originalCount - 1) {
|
||||||
|
this.stripeEndRows.remove(removeIndex);
|
||||||
|
}
|
||||||
|
this.stripeFiles.remove(removeIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, insert new stripes. The total ranges match, so we can insert where we removed.
|
||||||
|
byte[] previousEndRow = null;
|
||||||
|
int insertAt = removeFrom;
|
||||||
|
for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
|
||||||
|
if (previousEndRow != null) {
|
||||||
|
// Validate that the ranges are contiguous.
|
||||||
|
assert !isOpen(previousEndRow);
|
||||||
|
byte[] startRow = startOf(newStripe.getValue());
|
||||||
|
if (!rowEquals(previousEndRow, startRow)) {
|
||||||
|
throw new IOException("The new stripes produced by compaction are not contiguous");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Add the new stripe.
|
||||||
|
ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
|
||||||
|
tmp.add(newStripe.getValue());
|
||||||
|
stripeFiles.add(insertAt, tmp);
|
||||||
|
previousEndRow = newStripe.getKey();
|
||||||
|
if (!isOpen(previousEndRow)) {
|
||||||
|
stripeEndRows.add(insertAt, previousEndRow);
|
||||||
|
}
|
||||||
|
++insertAt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/** A mock used so our tests don't deal with actual StoreFiles */
|
||||||
|
public class MockStoreFile extends StoreFile {
|
||||||
|
long length = 0;
|
||||||
|
boolean isRef = false;
|
||||||
|
long ageInDisk;
|
||||||
|
long sequenceid;
|
||||||
|
private Map<byte[], byte[]> metadata = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
byte[] splitPoint = null;
|
||||||
|
|
||||||
|
MockStoreFile(HBaseTestingUtility testUtil, Path testPath,
|
||||||
|
long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
|
||||||
|
super(testUtil.getTestFileSystem(), testPath, testUtil.getConfiguration(),
|
||||||
|
new CacheConfig(testUtil.getConfiguration()), BloomType.NONE,
|
||||||
|
NoOpDataBlockEncoder.INSTANCE);
|
||||||
|
this.length = length;
|
||||||
|
this.isRef = isRef;
|
||||||
|
this.ageInDisk = ageInDisk;
|
||||||
|
this.sequenceid = sequenceid;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setLength(long newLen) {
|
||||||
|
this.length = newLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
|
||||||
|
return this.splitPoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMaxSequenceId() {
|
||||||
|
return sequenceid;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isMajorCompaction() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isReference() {
|
||||||
|
return this.isRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean isBulkLoadResult() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getMetadataValue(byte[] key) {
|
||||||
|
return this.metadata.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMetadataValue(byte[] key, byte[] value) {
|
||||||
|
this.metadata.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StoreFile.Reader getReader() {
|
||||||
|
final long len = this.length;
|
||||||
|
return new StoreFile.Reader() {
|
||||||
|
@Override
|
||||||
|
public long length() {
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -125,64 +125,6 @@ public class TestDefaultCompactSelection extends TestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// used so our tests don't deal with actual StoreFiles
|
|
||||||
static class MockStoreFile extends StoreFile {
|
|
||||||
long length = 0;
|
|
||||||
boolean isRef = false;
|
|
||||||
long ageInDisk;
|
|
||||||
long sequenceid;
|
|
||||||
|
|
||||||
MockStoreFile(long length, long ageInDisk, boolean isRef, long sequenceid) throws IOException {
|
|
||||||
super(TEST_UTIL.getTestFileSystem(), TEST_FILE, TEST_UTIL.getConfiguration(),
|
|
||||||
new CacheConfig(TEST_UTIL.getConfiguration()), BloomType.NONE,
|
|
||||||
NoOpDataBlockEncoder.INSTANCE);
|
|
||||||
this.length = length;
|
|
||||||
this.isRef = isRef;
|
|
||||||
this.ageInDisk = ageInDisk;
|
|
||||||
this.sequenceid = sequenceid;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setLength(long newLen) {
|
|
||||||
this.length = newLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMaxSequenceId() {
|
|
||||||
return sequenceid;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isMajorCompaction() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isReference() {
|
|
||||||
return this.isRef;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public StoreFile.Reader getReader() {
|
|
||||||
final long len = this.length;
|
|
||||||
return new StoreFile.Reader() {
|
|
||||||
@Override
|
|
||||||
public long length() {
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "MockStoreFile{" +
|
|
||||||
"length=" + length +
|
|
||||||
", isRef=" + isRef +
|
|
||||||
", ageInDisk=" + ageInDisk +
|
|
||||||
", sequenceid=" + sequenceid +
|
|
||||||
'}';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ArrayList<Long> toArrayList(long... numbers) {
|
ArrayList<Long> toArrayList(long... numbers) {
|
||||||
ArrayList<Long> result = new ArrayList<Long>();
|
ArrayList<Long> result = new ArrayList<Long>();
|
||||||
for (long i : numbers) {
|
for (long i : numbers) {
|
||||||
|
@ -216,7 +158,8 @@ public class TestDefaultCompactSelection extends TestCase {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<StoreFile> ret = Lists.newArrayList();
|
List<StoreFile> ret = Lists.newArrayList();
|
||||||
for (int i = 0; i < sizes.size(); i++) {
|
for (int i = 0; i < sizes.size(); i++) {
|
||||||
ret.add(new MockStoreFile(sizes.get(i), ageInDisk.get(i), isReference, i));
|
ret.add(new MockStoreFile(TEST_UTIL, TEST_FILE,
|
||||||
|
sizes.get(i), ageInDisk.get(i), isReference, i));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,592 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestStripeStoreFileManager {
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static final Path BASEDIR =
|
||||||
|
TEST_UTIL.getDataTestDir(TestStripeStoreFileManager.class.getSimpleName());
|
||||||
|
private static final Path CFDIR = HStore.getStoreHomedir(BASEDIR, "region", Bytes.toBytes("cf"));
|
||||||
|
|
||||||
|
private static final byte[] KEY_A = Bytes.toBytes("aaa");
|
||||||
|
private static final byte[] KEY_B = Bytes.toBytes("bbb");
|
||||||
|
private static final byte[] KEY_C = Bytes.toBytes("ccc");
|
||||||
|
private static final byte[] KEY_D = Bytes.toBytes("ddd");
|
||||||
|
|
||||||
|
private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
|
||||||
|
private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
|
||||||
|
private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
|
||||||
|
private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
if (!fs.mkdirs(CFDIR)) {
|
||||||
|
throw new IOException("Cannot create test directory " + CFDIR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
if (fs.exists(CFDIR) && !fs.delete(CFDIR, true)) {
|
||||||
|
throw new IOException("Cannot delete test directory " + CFDIR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertFilesIntoL0() throws Exception {
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
MockStoreFile sf = createFile();
|
||||||
|
manager.insertNewFile(sf);
|
||||||
|
assertEquals(1, manager.getStorefileCount());
|
||||||
|
Collection<StoreFile> filesForGet = manager.getFilesForScanOrGet(true, KEY_A, KEY_A);
|
||||||
|
assertEquals(1, filesForGet.size());
|
||||||
|
assertTrue(filesForGet.contains(sf));
|
||||||
|
|
||||||
|
// Add some stripes and make sure we get this file for every stripe.
|
||||||
|
manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_B, OPEN_KEY)));
|
||||||
|
assertTrue(manager.getFilesForScanOrGet(true, KEY_A, KEY_A).contains(sf));
|
||||||
|
assertTrue(manager.getFilesForScanOrGet(true, KEY_C, KEY_C).contains(sf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearFiles() throws Exception {
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
manager.insertNewFile(createFile());
|
||||||
|
manager.insertNewFile(createFile());
|
||||||
|
manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_B, OPEN_KEY)));
|
||||||
|
assertEquals(4, manager.getStorefileCount());
|
||||||
|
Collection<StoreFile> allFiles = manager.clearFiles();
|
||||||
|
assertEquals(4, allFiles.size());
|
||||||
|
assertEquals(0, manager.getStorefileCount());
|
||||||
|
assertEquals(0, manager.getStorefiles().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ArrayList<StoreFile> dumpIterator(Iterator<StoreFile> iter) {
|
||||||
|
ArrayList<StoreFile> result = new ArrayList<StoreFile>();
|
||||||
|
for (; iter.hasNext(); result.add(iter.next()));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRowKeyBefore() throws Exception {
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
StoreFile l0File = createFile(), l0File2 = createFile();
|
||||||
|
manager.insertNewFile(l0File);
|
||||||
|
manager.insertNewFile(l0File2);
|
||||||
|
// Get candidate files.
|
||||||
|
Iterator<StoreFile> sfs = manager.getCandidateFilesForRowKeyBefore(KV_B);
|
||||||
|
sfs.next();
|
||||||
|
sfs.remove();
|
||||||
|
// Suppose we found a candidate in this file... make sure L0 file remaining is not removed.
|
||||||
|
sfs = manager.updateCandidateFilesForRowKeyBefore(sfs, KV_B, KV_A);
|
||||||
|
assertTrue(sfs.hasNext());
|
||||||
|
// Now add some stripes (remove L0 file too)
|
||||||
|
MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
|
||||||
|
stripe1 = createFile(KEY_B, OPEN_KEY);
|
||||||
|
manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
|
||||||
|
// If we want a key <= KEY_A, we should get everything except stripe1.
|
||||||
|
ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
|
||||||
|
assertEquals(2, sfsDump.size());
|
||||||
|
assertTrue(sfsDump.contains(stripe0a));
|
||||||
|
assertFalse(sfsDump.contains(stripe1));
|
||||||
|
// If we want a key <= KEY_B, we should get everything since lower bound is inclusive.
|
||||||
|
sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_B));
|
||||||
|
assertEquals(3, sfsDump.size());
|
||||||
|
assertTrue(sfsDump.contains(stripe1));
|
||||||
|
// For KEY_D, we should also get everything.
|
||||||
|
sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_D));
|
||||||
|
assertEquals(3, sfsDump.size());
|
||||||
|
// Suppose in the first file we found candidate with KEY_C.
|
||||||
|
// Then, stripe0 no longer matters and should be removed, but stripe1 should stay.
|
||||||
|
sfs = manager.getCandidateFilesForRowKeyBefore(KV_D);
|
||||||
|
sfs.next(); // Skip L0 file.
|
||||||
|
sfs.remove();
|
||||||
|
sfs = manager.updateCandidateFilesForRowKeyBefore(sfs, KV_D, KV_C);
|
||||||
|
assertEquals(stripe1, sfs.next());
|
||||||
|
assertFalse(sfs.hasNext());
|
||||||
|
// Add one more, later, file to stripe0, remove the last annoying L0 file.
|
||||||
|
// This file should be returned in preference to older L0 file; also, after we get
|
||||||
|
// a candidate from the first file, the old one should not be removed.
|
||||||
|
StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
|
||||||
|
manager.addCompactionResults(al(l0File2), al(stripe0b));
|
||||||
|
sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
|
||||||
|
assertEquals(stripe0b, sfs.next());
|
||||||
|
sfs.remove();
|
||||||
|
sfs = manager.updateCandidateFilesForRowKeyBefore(sfs, KV_A, KV_A);
|
||||||
|
assertEquals(stripe0a, sfs.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSplitPointEdgeCases() throws Exception {
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
// No files => no split.
|
||||||
|
assertNull(manager.getSplitPoint());
|
||||||
|
|
||||||
|
// If there are no stripes, should pick midpoint from the biggest file in L0.
|
||||||
|
MockStoreFile sf5 = createFile(5, 0);
|
||||||
|
sf5.splitPoint = new byte[1];
|
||||||
|
manager.insertNewFile(sf5);
|
||||||
|
manager.insertNewFile(createFile(1, 0));
|
||||||
|
assertEquals(sf5.splitPoint, manager.getSplitPoint());
|
||||||
|
|
||||||
|
// Same if there's one stripe but the biggest file is still in L0.
|
||||||
|
manager.addCompactionResults(al(), al(createFile(2, 0, OPEN_KEY, OPEN_KEY)));
|
||||||
|
assertEquals(sf5.splitPoint, manager.getSplitPoint());
|
||||||
|
|
||||||
|
// If the biggest file is in the stripe, should get from it.
|
||||||
|
MockStoreFile sf6 = createFile(6, 0, OPEN_KEY, OPEN_KEY);
|
||||||
|
sf6.splitPoint = new byte[1];
|
||||||
|
manager.addCompactionResults(al(), al(sf6));
|
||||||
|
assertEquals(sf6.splitPoint, manager.getSplitPoint());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetStripeBoundarySplits() throws Exception {
|
||||||
|
/* First number - split must be after this stripe; further numbers - stripes */
|
||||||
|
verifySplitPointScenario(5, false, 0f, 2, 1, 1, 1, 1, 1, 10);
|
||||||
|
verifySplitPointScenario(0, false, 0f, 6, 3, 1, 1, 2);
|
||||||
|
verifySplitPointScenario(2, false, 0f, 1, 1, 1, 1, 2);
|
||||||
|
verifySplitPointScenario(0, false, 0f, 5, 4);
|
||||||
|
verifySplitPointScenario(2, false, 0f, 5, 2, 5, 5, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetUnbalancedSplits() throws Exception {
|
||||||
|
/* First number - split must be inside/after this stripe; further numbers - stripes */
|
||||||
|
verifySplitPointScenario(0, false, 2.1f, 4, 4, 4); // 8/4 is less than 2.1f
|
||||||
|
verifySplitPointScenario(1, true, 1.5f, 4, 4, 4); // 8/4 > 6/6
|
||||||
|
verifySplitPointScenario(1, false, 1.1f, 3, 4, 1, 1, 2, 2); // 7/6 < 8/5
|
||||||
|
verifySplitPointScenario(1, false, 1.1f, 3, 6, 1, 1, 2, 2); // 9/6 == 9/6
|
||||||
|
verifySplitPointScenario(1, true, 1.1f, 3, 8, 1, 1, 2, 2); // 11/6 > 10/7
|
||||||
|
verifySplitPointScenario(3, false, 1.1f, 2, 2, 1, 1, 4, 3); // reverse order
|
||||||
|
verifySplitPointScenario(4, true, 1.1f, 2, 2, 1, 1, 8, 3); // reverse order
|
||||||
|
verifySplitPointScenario(0, true, 1.5f, 10, 4); // 10/4 > 9/5
|
||||||
|
verifySplitPointScenario(0, false, 1.4f, 6, 4); // 6/4 == 6/4
|
||||||
|
verifySplitPointScenario(1, true, 1.5f, 4, 10); // reverse just in case
|
||||||
|
verifySplitPointScenario(0, false, 1.4f, 4, 6); // reverse just in case
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies scenario for finding a split point.
|
||||||
|
* @param splitPointAfter Stripe to expect the split point at/after.
|
||||||
|
* @param shouldSplitStripe If true, the split point is expected in the middle of the above
|
||||||
|
* stripe; if false, should be at the end.
|
||||||
|
* @param splitRatioToVerify Maximum split imbalance ratio.
|
||||||
|
* @param sizes Stripe sizes.
|
||||||
|
*/
|
||||||
|
private void verifySplitPointScenario(int splitPointAfter, boolean shouldSplitStripe,
|
||||||
|
float splitRatioToVerify, int... sizes) throws Exception {
|
||||||
|
assertTrue(sizes.length > 1);
|
||||||
|
ArrayList<StoreFile> sfs = new ArrayList<StoreFile>();
|
||||||
|
for (int sizeIx = 0; sizeIx < sizes.length; ++sizeIx) {
|
||||||
|
byte[] startKey = (sizeIx == 0) ? OPEN_KEY : Bytes.toBytes(sizeIx - 1);
|
||||||
|
byte[] endKey = (sizeIx == sizes.length - 1) ? OPEN_KEY : Bytes.toBytes(sizeIx);
|
||||||
|
MockStoreFile sf = createFile(sizes[sizeIx], 0, startKey, endKey);
|
||||||
|
sf.splitPoint = Bytes.toBytes(-sizeIx); // set split point to the negative index
|
||||||
|
sfs.add(sf);
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
if (splitRatioToVerify != 0) {
|
||||||
|
conf.setFloat(StripeStoreConfig.MAX_SPLIT_IMBALANCE, splitRatioToVerify);
|
||||||
|
}
|
||||||
|
StripeStoreFileManager manager = createManager(al(), conf);
|
||||||
|
manager.addCompactionResults(al(), sfs);
|
||||||
|
int result = Bytes.toInt(manager.getSplitPoint());
|
||||||
|
// Either end key and thus positive index, or "middle" of the file and thus negative index.
|
||||||
|
assertEquals(splitPointAfter * (shouldSplitStripe ? -1 : 1), result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] keyAfter(byte[] key) {
|
||||||
|
return Arrays.copyOf(key, key.length + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetFilesForGetAndScan() throws Exception {
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
verifyGetAndScanScenario(manager, null, null);
|
||||||
|
verifyGetAndScanScenario(manager, KEY_B, KEY_C);
|
||||||
|
|
||||||
|
// Populate one L0 file.
|
||||||
|
MockStoreFile sf0 = createFile();
|
||||||
|
manager.insertNewFile(sf0);
|
||||||
|
verifyGetAndScanScenario(manager, null, null, sf0);
|
||||||
|
verifyGetAndScanScenario(manager, null, KEY_C, sf0);
|
||||||
|
verifyGetAndScanScenario(manager, KEY_B, null, sf0);
|
||||||
|
verifyGetAndScanScenario(manager, KEY_B, KEY_C, sf0);
|
||||||
|
|
||||||
|
// Populate a bunch of files for stripes, keep L0.
|
||||||
|
MockStoreFile sfA = createFile(OPEN_KEY, KEY_A);
|
||||||
|
MockStoreFile sfB = createFile(KEY_A, KEY_B);
|
||||||
|
MockStoreFile sfC = createFile(KEY_B, KEY_C);
|
||||||
|
MockStoreFile sfD = createFile(KEY_C, KEY_D);
|
||||||
|
MockStoreFile sfE = createFile(KEY_D, OPEN_KEY);
|
||||||
|
manager.addCompactionResults(al(), al(sfA, sfB, sfC, sfD, sfE));
|
||||||
|
|
||||||
|
verifyGetAndScanScenario(manager, null, null, sf0, sfA, sfB, sfC, sfD, sfE);
|
||||||
|
verifyGetAndScanScenario(manager, keyAfter(KEY_A), null, sf0, sfB, sfC, sfD, sfE);
|
||||||
|
verifyGetAndScanScenario(manager, null, keyAfter(KEY_C), sf0, sfA, sfB, sfC, sfD);
|
||||||
|
verifyGetAndScanScenario(manager, KEY_B, null, sf0, sfC, sfD, sfE);
|
||||||
|
verifyGetAndScanScenario(manager, null, KEY_C, sf0, sfA, sfB, sfC, sfD);
|
||||||
|
verifyGetAndScanScenario(manager, KEY_B, keyAfter(KEY_B), sf0, sfC);
|
||||||
|
verifyGetAndScanScenario(manager, keyAfter(KEY_A), KEY_B, sf0, sfB, sfC);
|
||||||
|
verifyGetAndScanScenario(manager, KEY_D, KEY_D, sf0, sfE);
|
||||||
|
verifyGetAndScanScenario(manager, keyAfter(KEY_B), keyAfter(KEY_C), sf0, sfC, sfD);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyGetAndScanScenario(StripeStoreFileManager manager,
|
||||||
|
byte[] start, byte[] end, StoreFile... results) throws Exception {
|
||||||
|
verifyGetOrScanScenario(manager, true, start, end, results);
|
||||||
|
verifyGetOrScanScenario(manager, false, start, end, results);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testLoadFilesWithRecoverableBadFiles() throws Exception {
|
||||||
|
// In L0, there will be file w/o metadata (real L0, 3 files with invalid metadata, and 3
|
||||||
|
// files that overlap valid stripes in various ways). Note that the 4th way to overlap the
|
||||||
|
// stripes will cause the structure to be mostly scraped, and is tested separately.
|
||||||
|
ArrayList<StoreFile> validStripeFiles = al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY),
|
||||||
|
createFile(KEY_C, OPEN_KEY));
|
||||||
|
ArrayList<StoreFile> filesToGoToL0 = al(createFile(), createFile(null, KEY_A),
|
||||||
|
createFile(KEY_D, null), createFile(KEY_D, KEY_A), createFile(keyAfter(KEY_A), KEY_C),
|
||||||
|
createFile(OPEN_KEY, KEY_D), createFile(KEY_D, keyAfter(KEY_D)));
|
||||||
|
ArrayList<StoreFile> allFilesToGo = flattenLists(validStripeFiles, filesToGoToL0);
|
||||||
|
Collections.shuffle(allFilesToGo);
|
||||||
|
StripeStoreFileManager manager = createManager(allFilesToGo);
|
||||||
|
List<StoreFile> l0Files = manager.getLevel0Files();
|
||||||
|
assertEquals(filesToGoToL0.size(), l0Files.size());
|
||||||
|
for (StoreFile sf : filesToGoToL0) {
|
||||||
|
assertTrue(l0Files.contains(sf));
|
||||||
|
}
|
||||||
|
verifyAllFiles(manager, allFilesToGo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadFilesWithBadStripe() throws Exception {
|
||||||
|
// Current "algorithm" will see the after-B key before C key, add it as valid stripe,
|
||||||
|
// and then fail all other stripes. So everything would end up in L0.
|
||||||
|
ArrayList<StoreFile> allFilesToGo = al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY),
|
||||||
|
createFile(KEY_B, keyAfter(KEY_B)));
|
||||||
|
Collections.shuffle(allFilesToGo);
|
||||||
|
StripeStoreFileManager manager = createManager(allFilesToGo);
|
||||||
|
assertEquals(allFilesToGo.size(), manager.getLevel0Files().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadFilesWithGaps() throws Exception {
|
||||||
|
// Stripes must not have gaps. If they do, everything goes to L0.
|
||||||
|
StripeStoreFileManager manager =
|
||||||
|
createManager(al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY)));
|
||||||
|
assertEquals(2, manager.getLevel0Files().size());
|
||||||
|
// Just one open stripe should be ok.
|
||||||
|
manager = createManager(al(createFile(OPEN_KEY, OPEN_KEY)));
|
||||||
|
assertEquals(0, manager.getLevel0Files().size());
|
||||||
|
assertEquals(1, manager.getStorefileCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadFilesAfterSplit() throws Exception {
|
||||||
|
// If stripes are good but have non-open ends, they must be treated as open ends.
|
||||||
|
MockStoreFile sf = createFile(KEY_B, KEY_C);
|
||||||
|
StripeStoreFileManager manager = createManager(al(createFile(OPEN_KEY, KEY_B), sf));
|
||||||
|
assertEquals(0, manager.getLevel0Files().size());
|
||||||
|
// Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
||||||
|
manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
|
||||||
|
// Do the same for other variants.
|
||||||
|
manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
||||||
|
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
|
||||||
|
manager = createManager(al(sf));
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
||||||
|
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testAddingCompactionResults() throws Exception {
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
// First, add some L0 files and "compact" one with new stripe creation.
|
||||||
|
StoreFile sf_L0_0a = createFile();
|
||||||
|
StoreFile sf_L0_0b = createFile();
|
||||||
|
manager.insertNewFile(sf_L0_0a);
|
||||||
|
manager.insertNewFile(sf_L0_0b);
|
||||||
|
|
||||||
|
// Try compacting with invalid new branches (gaps, overlaps) - no effect.
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B)));
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_C, OPEN_KEY)));
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_B, OPEN_KEY), createFile(KEY_A, KEY_D)));
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_A, KEY_B), createFile(KEY_B, OPEN_KEY)));
|
||||||
|
|
||||||
|
StoreFile sf_i2B_0 = createFile(OPEN_KEY, KEY_B);
|
||||||
|
StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
|
||||||
|
StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
|
||||||
|
manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
|
||||||
|
verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
|
||||||
|
|
||||||
|
// Add another l0 file, "compact" both L0 into two stripes
|
||||||
|
StoreFile sf_L0_1 = createFile();
|
||||||
|
StoreFile sf_i2B_1 = createFile(OPEN_KEY, KEY_B);
|
||||||
|
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
|
||||||
|
manager.insertNewFile(sf_L0_1);
|
||||||
|
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
|
||||||
|
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
|
||||||
|
|
||||||
|
// Try compacting with invalid file (no metadata) - should add files to L0.
|
||||||
|
StoreFile sf_L0_2 = createFile(null, null);
|
||||||
|
manager.addCompactionResults(al(), al(sf_L0_2));
|
||||||
|
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
|
||||||
|
// Remove it...
|
||||||
|
manager.addCompactionResults(al(sf_L0_2), al());
|
||||||
|
|
||||||
|
// Do regular compaction in the first stripe.
|
||||||
|
StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
|
||||||
|
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
|
||||||
|
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
|
||||||
|
|
||||||
|
// Try to rebalance two stripes, but don't take all files from them - no effect.
|
||||||
|
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
|
||||||
|
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
|
||||||
|
ArrayList<StoreFile> compacted3 = al();
|
||||||
|
verifyInvalidCompactionScenario(manager, al(sf_B2C_0, sf_C2i_0), al(sf_B2D_4, sf_D2i_4));
|
||||||
|
|
||||||
|
// Rebalance two stripes correctly.
|
||||||
|
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
|
||||||
|
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
|
||||||
|
|
||||||
|
// Split the first stripe.
|
||||||
|
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
|
||||||
|
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
|
||||||
|
ArrayList<StoreFile> compacted4 = al(createFile(OPEN_KEY, KEY_A), createFile(KEY_A, KEY_B));
|
||||||
|
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
|
||||||
|
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
|
||||||
|
|
||||||
|
// Split the middle stripe.
|
||||||
|
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
|
||||||
|
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
|
||||||
|
ArrayList<StoreFile> compacted5 = al(createFile(KEY_B, KEY_C), createFile(KEY_C, KEY_D));
|
||||||
|
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
|
||||||
|
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
|
||||||
|
|
||||||
|
// Merge two different middle stripes.
|
||||||
|
StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
|
||||||
|
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
|
||||||
|
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
|
||||||
|
|
||||||
|
// Try various range mismatch cases in replaced and new data - no effect.
|
||||||
|
ArrayList<StoreFile> tmp = al(sf_A2C_7, sf_C2D_6); // [A, C)
|
||||||
|
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, KEY_C)));
|
||||||
|
verifyInvalidCompactionScenario(manager, tmp, al(createFile(OPEN_KEY, KEY_D)));
|
||||||
|
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, OPEN_KEY)));
|
||||||
|
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, KEY_B)));
|
||||||
|
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, keyAfter(KEY_B))));
|
||||||
|
|
||||||
|
// Merge lower half.
|
||||||
|
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
|
||||||
|
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
|
||||||
|
verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
|
||||||
|
|
||||||
|
// Merge all.
|
||||||
|
StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
|
||||||
|
manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
|
||||||
|
verifyAllFiles(manager, al(sf_i2i_9));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyResultsForStripes() throws Exception {
|
||||||
|
// Test that we can compact L0 into a subset of stripes.
|
||||||
|
StripeStoreFileManager manager = createManager();
|
||||||
|
StoreFile sf0a = createFile();
|
||||||
|
StoreFile sf0b = createFile();
|
||||||
|
manager.insertNewFile(sf0a);
|
||||||
|
manager.insertNewFile(sf0b);
|
||||||
|
ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
|
||||||
|
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
|
||||||
|
manager.addCompactionResults(al(sf0a), compacted);
|
||||||
|
// Next L0 compaction only produces file for the first and last stripe.
|
||||||
|
ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY));
|
||||||
|
manager.addCompactionResults(al(sf0b), compacted2);
|
||||||
|
compacted.addAll(compacted2);
|
||||||
|
verifyAllFiles(manager, compacted);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPriority() throws Exception {
|
||||||
|
// Expected priority, file limit, stripe count, files per stripe, l0 files.
|
||||||
|
testPriorityScenario(5, 5, 0, 0, 0);
|
||||||
|
testPriorityScenario(2, 5, 0, 0, 3);
|
||||||
|
testPriorityScenario(4, 25, 5, 1, 0); // example case.
|
||||||
|
testPriorityScenario(3, 25, 5, 1, 1); // L0 files counts for all stripes.
|
||||||
|
testPriorityScenario(3, 25, 5, 2, 0); // file to each stripe - same as one L0 file.
|
||||||
|
testPriorityScenario(2, 25, 5, 4, 0); // 1 is priority user, so 2 is returned.
|
||||||
|
testPriorityScenario(2, 25, 5, 4, 4); // don't return higher than user unless over limit.
|
||||||
|
testPriorityScenario(2, 25, 5, 1, 10); // same.
|
||||||
|
testPriorityScenario(0, 25, 5, 4, 5); // at limit.
|
||||||
|
testPriorityScenario(-5, 25, 5, 6, 0); // over limit!
|
||||||
|
testPriorityScenario(-1, 25, 0, 0, 26); // over limit with just L0
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testPriorityScenario(int expectedPriority,
|
||||||
|
int limit, int stripes, int filesInStripe, int l0Files) throws Exception
|
||||||
|
{
|
||||||
|
final byte[][] keys = { KEY_A, KEY_B, KEY_C, KEY_D };
|
||||||
|
assertTrue(stripes <= keys.length + 1);
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setInt("hbase.hstore.blockingStoreFiles", limit);
|
||||||
|
StripeStoreFileManager sfm = createManager(al(), conf);
|
||||||
|
for (int i = 0; i < l0Files; ++i) {
|
||||||
|
sfm.insertNewFile(createFile());
|
||||||
|
}
|
||||||
|
for (int i = 0; i < filesInStripe; ++i) {
|
||||||
|
ArrayList<StoreFile> stripe = new ArrayList<StoreFile>();
|
||||||
|
for (int j = 0; j < stripes; ++j) {
|
||||||
|
stripe.add(createFile(
|
||||||
|
(j == 0) ? OPEN_KEY : keys[j - 1], (j == stripes - 1) ? OPEN_KEY : keys[j]));
|
||||||
|
}
|
||||||
|
sfm.addCompactionResults(al(), stripe);
|
||||||
|
}
|
||||||
|
assertEquals(expectedPriority, sfm.getStoreCompactionPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyInvalidCompactionScenario(StripeStoreFileManager manager,
|
||||||
|
ArrayList<StoreFile> filesToCompact, ArrayList<StoreFile> filesToInsert) throws Exception {
|
||||||
|
Collection<StoreFile> allFiles = manager.getStorefiles();
|
||||||
|
try {
|
||||||
|
manager.addCompactionResults(filesToCompact, filesToInsert);
|
||||||
|
fail("Should have thrown");
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// Ignore it.
|
||||||
|
}
|
||||||
|
verifyAllFiles(manager, allFiles); // must have the same files.
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyGetOrScanScenario(StripeStoreFileManager manager, boolean isGet,
|
||||||
|
byte[] start, byte[] end, StoreFile... results) throws Exception {
|
||||||
|
verifyGetOrScanScenario(manager, isGet, start, end, Arrays.asList(results));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyGetOrScanScenario(StripeStoreFileManager manager, boolean isGet,
|
||||||
|
byte[] start, byte[] end, Collection<StoreFile> results) throws Exception {
|
||||||
|
start = start != null ? start : HConstants.EMPTY_START_ROW;
|
||||||
|
end = end != null ? end : HConstants.EMPTY_END_ROW;
|
||||||
|
Collection<StoreFile> sfs = manager.getFilesForScanOrGet(isGet, start, end);
|
||||||
|
assertEquals(results.size(), sfs.size());
|
||||||
|
for (StoreFile result : results) {
|
||||||
|
assertTrue(sfs.contains(result));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyAllFiles(
|
||||||
|
StripeStoreFileManager manager, Collection<StoreFile> results) throws Exception {
|
||||||
|
verifyGetOrScanScenario(manager, false, null, null, results);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MockStoreFile createFile(
|
||||||
|
long size, long seqNum, byte[] startKey, byte[] endKey) throws Exception {
|
||||||
|
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
Path testFilePath = StoreFile.getUniqueFile(fs, CFDIR);
|
||||||
|
fs.create(testFilePath);
|
||||||
|
MockStoreFile sf = new MockStoreFile(TEST_UTIL, testFilePath, size, 0, false, seqNum);
|
||||||
|
if (startKey != null) {
|
||||||
|
sf.setMetadataValue(StripeStoreFileManager.STRIPE_START_KEY, startKey);
|
||||||
|
}
|
||||||
|
if (endKey != null) {
|
||||||
|
sf.setMetadataValue(StripeStoreFileManager.STRIPE_END_KEY, endKey);
|
||||||
|
}
|
||||||
|
return sf;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MockStoreFile createFile(long size, long seqNum) throws Exception {
|
||||||
|
return createFile(size, seqNum, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MockStoreFile createFile(byte[] startKey, byte[] endKey) throws Exception {
|
||||||
|
return createFile(0, 0, startKey, endKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MockStoreFile createFile() throws Exception {
|
||||||
|
return createFile(null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StripeStoreFileManager createManager() throws Exception {
|
||||||
|
return createManager(new ArrayList<StoreFile>());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StripeStoreFileManager createManager(ArrayList<StoreFile> sfs) throws Exception {
|
||||||
|
return createManager(sfs, TEST_UTIL.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StripeStoreFileManager createManager(
|
||||||
|
ArrayList<StoreFile> sfs, Configuration conf) throws Exception {
|
||||||
|
StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf);
|
||||||
|
result.loadFiles(sfs);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ArrayList<StoreFile> al(StoreFile... sfs) {
|
||||||
|
return new ArrayList<StoreFile>(Arrays.asList(sfs));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ArrayList<StoreFile> flattenLists(ArrayList<StoreFile>... sfls) {
|
||||||
|
ArrayList<StoreFile> result = new ArrayList<StoreFile>();
|
||||||
|
for (ArrayList<StoreFile> sfl : sfls) {
|
||||||
|
result.addAll(sfl);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue