Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
c253f8f809
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* A comparator for case where {@link ByteBufferKeyValue} is prevalent type (BBKV
|
||||
* is base-type in hbase2). Takes a general comparator as fallback in case types are NOT the
|
||||
* expected ByteBufferKeyValue.
|
||||
*
|
||||
* <p>This is a tricked-out Comparator at heart of hbase read and write. It is in
|
||||
* the HOT path so we try all sorts of ugly stuff so we can go faster. See below
|
||||
* in this javadoc comment for the list.
|
||||
*
|
||||
* <p>Apply this comparator narrowly so it is fed exclusively ByteBufferKeyValues
|
||||
* as much as is possible so JIT can settle (e.g. make one per ConcurrentSkipListMap
|
||||
* in HStore).
|
||||
*
|
||||
* <p>Exploits specially added methods in BBKV to save on deserializations of shorts,
|
||||
* longs, etc: i.e. calculating the family length requires row length; pass it in
|
||||
* rather than recalculate it, and so on.
|
||||
*
|
||||
* <p>This comparator does static dispatch to private final methods so hotspot is comfortable
|
||||
* deciding inline.
|
||||
*
|
||||
* <p>Measurement has it that we almost have it so all inlines from memstore
|
||||
* ConcurrentSkipListMap on down to the (unsafe) intrinisics that do byte compare
|
||||
* and deserialize shorts and ints; needs a bit more work.
|
||||
*
|
||||
* <p>Does not take a Type to compare: i.e. it is not a Comparator<Cell> or
|
||||
* CellComparator<Cell> or Comparator<ByteBufferKeyValue> because that adds
|
||||
* another method to the hierarchy -- from compare(Object, Object)
|
||||
* to dynamic compare(Cell, Cell) to static private compare -- and inlining doesn't happen if
|
||||
* hierarchy is too deep (it is the case here).
|
||||
*
|
||||
* <p>Be careful making changes. Compare perf before and after and look at what
|
||||
* hotspot ends up generating before committing change (jitwatch is helpful here).
|
||||
* Changing this one class doubled write throughput (HBASE-20483).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BBKVComparator implements Comparator {
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(BBKVComparator.class);
|
||||
private final Comparator fallback;
|
||||
|
||||
public BBKVComparator(Comparator fallback) {
|
||||
this.fallback = fallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(Object l, Object r) {
|
||||
// LOG.info("ltype={} rtype={}", l, r);
|
||||
if ((l instanceof ByteBufferKeyValue) && (r instanceof ByteBufferKeyValue)) {
|
||||
return compare((ByteBufferKeyValue)l, (ByteBufferKeyValue)r);
|
||||
}
|
||||
// Skip calling compare(Object, Object) and go direct to compare(Cell, Cell)
|
||||
return this.fallback.compare((Cell)l, (Cell)r);
|
||||
}
|
||||
|
||||
// TODO: Come back here. We get a few percentage points extra of throughput if this is a
|
||||
// private method.
|
||||
static final int compare(ByteBufferKeyValue left, ByteBufferKeyValue right) {
|
||||
// NOTE: Same method is in CellComparatorImpl, also private, not shared, intentionally. Not
|
||||
// sharing gets us a few percent more throughput in compares. If changes here or there, make
|
||||
// sure done in both places.
|
||||
|
||||
// Compare Rows. Cache row length.
|
||||
int leftRowLength = left.getRowLength();
|
||||
int rightRowLength = right.getRowLength();
|
||||
int diff = ByteBufferUtils.compareTo(left.getRowByteBuffer(), left.getRowPosition(),
|
||||
leftRowLength,
|
||||
right.getRowByteBuffer(), right.getRowPosition(), rightRowLength);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
||||
// If the column is not specified, the "minimum" key type appears as latest in the sorted
|
||||
// order, regardless of the timestamp. This is used for specifying the last key/value in a
|
||||
// given row, because there is no "lexicographically last column" (it would be infinitely long).
|
||||
// The "maximum" key type does not need this behavior. Copied from KeyValue. This is bad in that
|
||||
// we can't do memcmp w/ special rules like this.
|
||||
// TODO: Is there a test for this behavior?
|
||||
int leftFamilyLengthPosition = left.getFamilyLengthPosition(leftRowLength);
|
||||
int leftFamilyLength = left.getFamilyLength(leftFamilyLengthPosition);
|
||||
int leftKeyLength = left.getKeyLength();
|
||||
int leftQualifierLength = left.getQualifierLength(leftKeyLength, leftRowLength,
|
||||
leftFamilyLength);
|
||||
|
||||
// No need of left row length below here.
|
||||
|
||||
byte leftType = left.getTypeByte(leftKeyLength);
|
||||
if (leftFamilyLength + leftQualifierLength == 0 &&
|
||||
leftType == KeyValue.Type.Minimum.getCode()) {
|
||||
// left is "bigger", i.e. it appears later in the sorted order
|
||||
return 1;
|
||||
}
|
||||
|
||||
int rightFamilyLengthPosition = right.getFamilyLengthPosition(rightRowLength);
|
||||
int rightFamilyLength = right.getFamilyLength(rightFamilyLengthPosition);
|
||||
int rightKeyLength = right.getKeyLength();
|
||||
int rightQualifierLength = right.getQualifierLength(rightKeyLength, rightRowLength,
|
||||
rightFamilyLength);
|
||||
|
||||
// No need of right row length below here.
|
||||
|
||||
byte rightType = right.getTypeByte(rightKeyLength);
|
||||
if (rightFamilyLength + rightQualifierLength == 0 &&
|
||||
rightType == KeyValue.Type.Minimum.getCode()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Compare families.
|
||||
int leftFamilyPosition = left.getFamilyPosition(leftFamilyLengthPosition);
|
||||
int rightFamilyPosition = right.getFamilyPosition(rightFamilyLengthPosition);
|
||||
diff = ByteBufferUtils.compareTo(left.getFamilyByteBuffer(), leftFamilyPosition,
|
||||
leftFamilyLength,
|
||||
right.getFamilyByteBuffer(), rightFamilyPosition, rightFamilyLength);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
||||
// Compare qualifiers
|
||||
diff = ByteBufferUtils.compareTo(left.getQualifierByteBuffer(),
|
||||
left.getQualifierPosition(leftFamilyPosition, leftFamilyLength), leftQualifierLength,
|
||||
right.getQualifierByteBuffer(),
|
||||
right.getQualifierPosition(rightFamilyPosition, rightFamilyLength),
|
||||
rightQualifierLength);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
||||
// Timestamps.
|
||||
// Swap order we pass into compare so we get DESCENDING order.
|
||||
diff = Long.compare(right.getTimestamp(rightKeyLength), left.getTimestamp(leftKeyLength));
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
||||
// Compare types. Let the delete types sort ahead of puts; i.e. types
|
||||
// of higher numbers sort before those of lesser numbers. Maximum (255)
|
||||
// appears ahead of everything, and minimum (0) appears after
|
||||
// everything.
|
||||
diff = (0xff & rightType) - (0xff & leftType);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
||||
// Negate following comparisons so later edits show up first mvccVersion: later sorts first
|
||||
return Longs.compare(right.getSequenceId(), left.getSequenceId());
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import java.util.Comparator;
|
|||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Comparator for comparing cells and has some specialized methods that allows comparing individual
|
||||
* cell components like row, family, qualifier and timestamp
|
||||
|
@ -130,4 +131,11 @@ public interface CellComparator extends Comparator<Cell> {
|
|||
* timestamp 0 if both timestamps are equal
|
||||
*/
|
||||
int compareTimestamps(long leftCellts, long rightCellts);
|
||||
|
||||
/**
|
||||
* @return A dumbed-down, fast comparator for hbase2 base-type, the {@link ByteBufferKeyValue}.
|
||||
* Create an instance when you make a new memstore, when you know only BBKVs will be passed.
|
||||
* Do not pollute with types other than BBKV if can be helped; the Comparator will slow.
|
||||
*/
|
||||
Comparator getSimpleComparator();
|
||||
}
|
||||
|
|
|
@ -18,15 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
|
||||
|
||||
|
||||
/**
|
||||
* Compare two HBase cells. Do not use this method comparing <code>-ROOT-</code> or
|
||||
|
@ -34,9 +37,13 @@ import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
|
|||
* takes account of the special formatting of the row where we have commas to delimit table from
|
||||
* regionname, from row. See KeyValue for how it has a special comparator to do hbase:meta cells
|
||||
* and yet another for -ROOT-.
|
||||
* While using this comparator for {{@link #compareRows(Cell, Cell)} et al, the hbase:meta cells
|
||||
* <p>While using this comparator for {{@link #compareRows(Cell, Cell)} et al, the hbase:meta cells
|
||||
* format should be taken into consideration, for which the instance of this comparator
|
||||
* should be used. In all other cases the static APIs in this comparator would be enough
|
||||
* <p>HOT methods. We spend a good portion of CPU comparing. Anything that makes the compare
|
||||
* faster will likely manifest at the macro level. See also
|
||||
* {@link BBKVComparator}. Use it when mostly {@link ByteBufferKeyValue}s.
|
||||
* </p>
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||
value="UNKNOWN",
|
||||
|
@ -57,21 +64,17 @@ public class CellComparatorImpl implements CellComparator {
|
|||
public static final CellComparatorImpl META_COMPARATOR = new MetaCellComparator();
|
||||
|
||||
@Override
|
||||
public int compare(Cell a, Cell b) {
|
||||
public final int compare(final Cell a, final Cell b) {
|
||||
return compare(a, b, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare cells.
|
||||
* @param ignoreSequenceid True if we are to compare the key portion only and ignore
|
||||
* the sequenceid. Set to false to compare key and consider sequenceid.
|
||||
* @return 0 if equal, -1 if a < b, and +1 if a > b.
|
||||
*/
|
||||
@Override
|
||||
public int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
|
||||
|
||||
int diff = 0;
|
||||
// "Peel off" the most common path.
|
||||
if (a instanceof ByteBufferKeyValue && b instanceof ByteBufferKeyValue) {
|
||||
diff = compareByteBufferKeyValue((ByteBufferKeyValue)a, (ByteBufferKeyValue)b);
|
||||
diff = BBKVComparator.compare((ByteBufferKeyValue)a, (ByteBufferKeyValue)b);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
@ -88,82 +91,7 @@ public class CellComparatorImpl implements CellComparator {
|
|||
}
|
||||
|
||||
// Negate following comparisons so later edits show up first mvccVersion: later sorts first
|
||||
return ignoreSequenceid? diff: Longs.compare(b.getSequenceId(), a.getSequenceId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Specialized comparator for the ByteBufferKeyValue type exclusivesly.
|
||||
* Caches deserialized lengths of rows and families, etc., and reuses them where it can
|
||||
* (ByteBufferKeyValue has been changed to be amenable to our providing pre-made lengths, etc.)
|
||||
*/
|
||||
private static final int compareByteBufferKeyValue(ByteBufferKeyValue left,
|
||||
ByteBufferKeyValue right) {
|
||||
// Compare Rows. Cache row length.
|
||||
int leftRowLength = left.getRowLength();
|
||||
int rightRowLength = right.getRowLength();
|
||||
int diff = ByteBufferUtils.compareTo(
|
||||
left.getRowByteBuffer(), left.getRowPosition(), leftRowLength,
|
||||
right.getRowByteBuffer(), right.getRowPosition(), rightRowLength);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
||||
// If the column is not specified, the "minimum" key type appears the
|
||||
// latest in the sorted order, regardless of the timestamp. This is used
|
||||
// for specifying the last key/value in a given row, because there is no
|
||||
// "lexicographically last column" (it would be infinitely long). The
|
||||
// "maximum" key type does not need this behavior.
|
||||
// Copied from KeyValue. This is bad in that we can't do memcmp w/ special rules like this.
|
||||
// I tried to get rid of the above but scanners depend on it. TODO.
|
||||
int leftFamilyLengthPosition = left.getFamilyLengthPosition(leftRowLength);
|
||||
int leftFamilyLength = left.getFamilyLength(leftFamilyLengthPosition);
|
||||
int rightFamilyLengthPosition = right.getFamilyLengthPosition(rightRowLength);
|
||||
int rightFamilyLength = right.getFamilyLength(rightFamilyLengthPosition);
|
||||
int leftKeyLength = left.getKeyLength();
|
||||
int leftQualifierLength = left.getQualifierLength(leftKeyLength, leftRowLength,
|
||||
leftFamilyLength);
|
||||
byte leftType = left.getTypeByte(leftKeyLength);
|
||||
if (leftFamilyLength + leftQualifierLength == 0 && leftType == Type.Minimum.getCode()) {
|
||||
// left is "bigger", i.e. it appears later in the sorted order
|
||||
return 1;
|
||||
}
|
||||
int rightKeyLength = right.getKeyLength();
|
||||
int rightQualifierLength = right.getQualifierLength(rightKeyLength, rightRowLength,
|
||||
rightFamilyLength);
|
||||
byte rightType = right.getTypeByte(rightKeyLength);
|
||||
if (rightFamilyLength + rightQualifierLength == 0 && rightType == Type.Minimum.getCode()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Compare families.
|
||||
int leftFamilyPosition = left.getFamilyPosition(leftFamilyLengthPosition);
|
||||
int rightFamilyPosition = right.getFamilyPosition(rightFamilyLengthPosition);
|
||||
diff = ByteBufferUtils.compareTo(
|
||||
left.getFamilyByteBuffer(), leftFamilyPosition, leftFamilyLength,
|
||||
right.getFamilyByteBuffer(), rightFamilyPosition, rightFamilyLength);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
// Compare qualifiers
|
||||
diff = ByteBufferUtils.compareTo(left.getQualifierByteBuffer(),
|
||||
left.getQualifierPosition(leftFamilyPosition, leftFamilyLength), leftQualifierLength,
|
||||
right.getQualifierByteBuffer(),
|
||||
right.getQualifierPosition(rightFamilyPosition, rightFamilyLength),
|
||||
rightQualifierLength);
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
// Timestamps.
|
||||
diff = compareTimestampsInternal(left.getTimestamp(leftKeyLength),
|
||||
right.getTimestamp(rightKeyLength));
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
// Compare types. Let the delete types sort ahead of puts; i.e. types
|
||||
// of higher numbers sort before those of lesser numbers. Maximum (255)
|
||||
// appears ahead of everything, and minimum (0) appears after
|
||||
// everything.
|
||||
return (0xff & rightType) - (0xff & leftType);
|
||||
return ignoreSequenceid? diff: Long.compare(b.getSequenceId(), a.getSequenceId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -254,7 +182,7 @@ public class CellComparatorImpl implements CellComparator {
|
|||
return compareRows(left, left.getRowLength(), right, right.getRowLength());
|
||||
}
|
||||
|
||||
int compareRows(final Cell left, int leftRowLength, final Cell right, int rightRowLength) {
|
||||
static int compareRows(final Cell left, int leftRowLength, final Cell right, int rightRowLength) {
|
||||
// left and right can be exactly the same at the beginning of a row
|
||||
if (left == right) {
|
||||
return 0;
|
||||
|
@ -341,7 +269,7 @@ public class CellComparatorImpl implements CellComparator {
|
|||
return diff;
|
||||
}
|
||||
|
||||
diff = compareTimestamps(left, right);
|
||||
diff = compareTimestamps(left.getTimestamp(), right.getTimestamp());
|
||||
if (diff != 0) {
|
||||
return diff;
|
||||
}
|
||||
|
@ -355,16 +283,12 @@ public class CellComparatorImpl implements CellComparator {
|
|||
|
||||
@Override
|
||||
public int compareTimestamps(final Cell left, final Cell right) {
|
||||
return compareTimestampsInternal(left.getTimestamp(), right.getTimestamp());
|
||||
return compareTimestamps(left.getTimestamp(), right.getTimestamp());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTimestamps(final long ltimestamp, final long rtimestamp) {
|
||||
return compareTimestampsInternal(ltimestamp, rtimestamp);
|
||||
}
|
||||
|
||||
private static final int compareTimestampsInternal(final long ltimestamp, final long rtimestamp) {
|
||||
// Swap the times so sort is newest to oldest, descending.
|
||||
// Swap order we pass into compare so we get DESCENDING order.
|
||||
return Long.compare(rtimestamp, ltimestamp);
|
||||
}
|
||||
|
||||
|
@ -373,6 +297,7 @@ public class CellComparatorImpl implements CellComparator {
|
|||
* {@link KeyValue}s.
|
||||
*/
|
||||
public static class MetaCellComparator extends CellComparatorImpl {
|
||||
// TODO: Do we need a ByteBufferKeyValue version of this?
|
||||
@Override
|
||||
public int compareRows(final Cell left, final Cell right) {
|
||||
return compareRows(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
|
||||
|
@ -451,5 +376,15 @@ public class CellComparatorImpl implements CellComparator {
|
|||
right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getSimpleComparator() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getSimpleComparator() {
|
||||
return new BBKVComparator(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,9 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* must implement this.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ExtendedCell extends RawCell, HeapSize, Cloneable {
|
||||
|
||||
public interface ExtendedCell extends RawCell, HeapSize {
|
||||
int CELL_NOT_BASED_ON_CHUNK = -1;
|
||||
|
||||
/**
|
||||
* Write this cell to an OutputStream in a {@link KeyValue} format.
|
||||
* <br> KeyValue format <br>
|
||||
|
@ -87,6 +87,13 @@ public interface ExtendedCell extends RawCell, HeapSize, Cloneable {
|
|||
getValueLength(), getTagsLength(), withTags);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Serialized size (defaults to include tag length).
|
||||
*/
|
||||
default int getSerializedSize() {
|
||||
return getSerializedSize(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write this Cell into the given buf's offset in a {@link KeyValue} format.
|
||||
* @param buf The buffer where to write the Cell.
|
||||
|
@ -108,7 +115,8 @@ public interface ExtendedCell extends RawCell, HeapSize, Cloneable {
|
|||
/**
|
||||
* Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
|
||||
* chunks as in case of MemstoreLAB
|
||||
* @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
|
||||
* @return the chunk id if the cell is backed by fixed sized Chunks, else return
|
||||
* {@link #CELL_NOT_BASED_ON_CHUNK}; i.e. -1.
|
||||
*/
|
||||
default int getChunkId() {
|
||||
return CELL_NOT_BASED_ON_CHUNK;
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class IndividualBytesFieldCell implements ExtendedCell {
|
||||
public class IndividualBytesFieldCell implements ExtendedCell, Cloneable {
|
||||
|
||||
private static final long FIXED_OVERHEAD = ClassSize.align( // do alignment(padding gap)
|
||||
ClassSize.OBJECT // object header
|
||||
|
|
|
@ -76,7 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
* length and actual tag bytes length.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class KeyValue implements ExtendedCell {
|
||||
public class KeyValue implements ExtendedCell, Cloneable {
|
||||
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<>();
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class);
|
||||
|
|
|
@ -50,8 +50,7 @@ public final class ByteBufferUtils {
|
|||
public final static int NEXT_BIT_MASK = 1 << 7;
|
||||
@VisibleForTesting
|
||||
final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
|
||||
@VisibleForTesting
|
||||
final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
|
||||
public final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
|
||||
|
||||
private ByteBufferUtils() {
|
||||
}
|
||||
|
@ -630,6 +629,8 @@ public final class ByteBufferUtils {
|
|||
}
|
||||
|
||||
public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
|
||||
// NOTE: This method is copied over in BBKVComparator!!!!! For perf reasons. If you make
|
||||
// changes here, make them there too!!!!
|
||||
if (UNSAFE_UNALIGNED) {
|
||||
long offset1Adj, offset2Adj;
|
||||
Object refObj1 = null, refObj2 = null;
|
||||
|
@ -671,9 +672,12 @@ public final class ByteBufferUtils {
|
|||
return compareTo(buf1, o1, l1, buf2, o2, l2) == 0;
|
||||
}
|
||||
|
||||
// The below two methods show up in lots of places. Versions of them in commons util and in
|
||||
// Cassandra. In guava too? They are copied from ByteBufferUtils. They are here as static
|
||||
// privates. Seems to make code smaller and make Hotspot happier (comes of compares and study
|
||||
// of compiled code via jitwatch).
|
||||
|
||||
public static int compareTo(byte [] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
|
||||
// This method is nearly same as the compareTo that follows but hard sharing code given
|
||||
// byte array and bytebuffer types and this is a hot code path
|
||||
if (UNSAFE_UNALIGNED) {
|
||||
long offset2Adj;
|
||||
Object refObj2 = null;
|
||||
|
@ -738,7 +742,7 @@ public final class ByteBufferUtils {
|
|||
long lw = UnsafeAccess.theUnsafe.getLong(obj1, o1 + (long) i);
|
||||
long rw = UnsafeAccess.theUnsafe.getLong(obj2, o2 + (long) i);
|
||||
if (lw != rw) {
|
||||
if (!UnsafeAccess.littleEndian) {
|
||||
if (!UnsafeAccess.LITTLE_ENDIAN) {
|
||||
return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -1549,7 +1549,7 @@ public class Bytes implements Comparable<Bytes> {
|
|||
long lw = theUnsafe.getLong(buffer1, offset1Adj + i);
|
||||
long rw = theUnsafe.getLong(buffer2, offset2Adj + i);
|
||||
if (lw != rw) {
|
||||
if(!UnsafeAccess.littleEndian) {
|
||||
if(!UnsafeAccess.LITTLE_ENDIAN) {
|
||||
return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ public final class UnsafeAccess {
|
|||
/** The offset to the first element in a byte array. */
|
||||
public static final long BYTE_ARRAY_BASE_OFFSET;
|
||||
|
||||
static final boolean littleEndian = ByteOrder.nativeOrder()
|
||||
public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder()
|
||||
.equals(ByteOrder.LITTLE_ENDIAN);
|
||||
|
||||
// This number limits the number of bytes to copy per call to Unsafe's
|
||||
|
@ -81,7 +81,7 @@ public final class UnsafeAccess {
|
|||
* @return the short value
|
||||
*/
|
||||
public static short toShort(byte[] bytes, int offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Short.reverseBytes(theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
|
||||
} else {
|
||||
return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
|
||||
|
@ -95,7 +95,7 @@ public final class UnsafeAccess {
|
|||
* @return the int value
|
||||
*/
|
||||
public static int toInt(byte[] bytes, int offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
|
||||
} else {
|
||||
return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
|
||||
|
@ -109,7 +109,7 @@ public final class UnsafeAccess {
|
|||
* @return the long value
|
||||
*/
|
||||
public static long toLong(byte[] bytes, int offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Long.reverseBytes(theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
|
||||
} else {
|
||||
return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
|
||||
|
@ -125,7 +125,7 @@ public final class UnsafeAccess {
|
|||
* @return incremented offset
|
||||
*/
|
||||
public static int putShort(byte[] bytes, int offset, short val) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
val = Short.reverseBytes(val);
|
||||
}
|
||||
theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
|
||||
|
@ -140,7 +140,7 @@ public final class UnsafeAccess {
|
|||
* @return incremented offset
|
||||
*/
|
||||
public static int putInt(byte[] bytes, int offset, int val) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
val = Integer.reverseBytes(val);
|
||||
}
|
||||
theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
|
||||
|
@ -155,7 +155,7 @@ public final class UnsafeAccess {
|
|||
* @return incremented offset
|
||||
*/
|
||||
public static int putLong(byte[] bytes, int offset, long val) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
val = Long.reverseBytes(val);
|
||||
}
|
||||
theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
|
||||
|
@ -172,7 +172,7 @@ public final class UnsafeAccess {
|
|||
* @return short value at offset
|
||||
*/
|
||||
public static short toShort(ByteBuffer buf, int offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Short.reverseBytes(getAsShort(buf, offset));
|
||||
}
|
||||
return getAsShort(buf, offset);
|
||||
|
@ -186,7 +186,7 @@ public final class UnsafeAccess {
|
|||
* @return short value at offset
|
||||
*/
|
||||
public static short toShort(Object ref, long offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Short.reverseBytes(theUnsafe.getShort(ref, offset));
|
||||
}
|
||||
return theUnsafe.getShort(ref, offset);
|
||||
|
@ -214,7 +214,7 @@ public final class UnsafeAccess {
|
|||
* @return int value at offset
|
||||
*/
|
||||
public static int toInt(ByteBuffer buf, int offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Integer.reverseBytes(getAsInt(buf, offset));
|
||||
}
|
||||
return getAsInt(buf, offset);
|
||||
|
@ -228,7 +228,7 @@ public final class UnsafeAccess {
|
|||
* @return int value at offset
|
||||
*/
|
||||
public static int toInt(Object ref, long offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Integer.reverseBytes(theUnsafe.getInt(ref, offset));
|
||||
}
|
||||
return theUnsafe.getInt(ref, offset);
|
||||
|
@ -256,7 +256,7 @@ public final class UnsafeAccess {
|
|||
* @return long value at offset
|
||||
*/
|
||||
public static long toLong(ByteBuffer buf, int offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Long.reverseBytes(getAsLong(buf, offset));
|
||||
}
|
||||
return getAsLong(buf, offset);
|
||||
|
@ -270,7 +270,7 @@ public final class UnsafeAccess {
|
|||
* @return long value at offset
|
||||
*/
|
||||
public static long toLong(Object ref, long offset) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
return Long.reverseBytes(theUnsafe.getLong(ref, offset));
|
||||
}
|
||||
return theUnsafe.getLong(ref, offset);
|
||||
|
@ -297,7 +297,7 @@ public final class UnsafeAccess {
|
|||
* @return incremented offset
|
||||
*/
|
||||
public static int putInt(ByteBuffer buf, int offset, int val) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
val = Integer.reverseBytes(val);
|
||||
}
|
||||
if (buf.isDirect()) {
|
||||
|
@ -402,7 +402,7 @@ public final class UnsafeAccess {
|
|||
* @return incremented offset
|
||||
*/
|
||||
public static int putShort(ByteBuffer buf, int offset, short val) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
val = Short.reverseBytes(val);
|
||||
}
|
||||
if (buf.isDirect()) {
|
||||
|
@ -421,7 +421,7 @@ public final class UnsafeAccess {
|
|||
* @return incremented offset
|
||||
*/
|
||||
public static int putLong(ByteBuffer buf, int offset, long val) {
|
||||
if (littleEndian) {
|
||||
if (LITTLE_ENDIAN) {
|
||||
val = Long.reverseBytes(val);
|
||||
}
|
||||
if (buf.isDirect()) {
|
||||
|
|
|
@ -24,6 +24,8 @@ import static org.junit.Assert.assertFalse;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -67,9 +69,23 @@ public class TestByteBufferKeyValue {
|
|||
assertTrue(CellComparatorImpl.COMPARATOR.compare(cell1, cell3) < 0);
|
||||
Cell cell4 = getOffheapCell(row1, Bytes.toBytes("f"), qual2);
|
||||
assertTrue(CellComparatorImpl.COMPARATOR.compare(cell1, cell4) > 0);
|
||||
BBKVComparator comparator = new BBKVComparator(null);
|
||||
assertTrue(comparator.compare(cell1, cell2) < 0);
|
||||
assertTrue(comparator.compare(cell1, cell3) < 0);
|
||||
assertTrue(comparator.compare(cell1, cell4) > 0);
|
||||
ByteBuffer buf = ByteBuffer.allocate(row1.length);
|
||||
ByteBufferUtils.copyFromArrayToBuffer(buf, row1, 0, row1.length);
|
||||
|
||||
ConcurrentSkipListMap<ByteBufferKeyValue, ByteBufferKeyValue> map =
|
||||
new ConcurrentSkipListMap<>(comparator);
|
||||
map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1);
|
||||
map.put((ByteBufferKeyValue)cell2, (ByteBufferKeyValue)cell2);
|
||||
map.put((ByteBufferKeyValue)cell3, (ByteBufferKeyValue)cell3);
|
||||
map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1);
|
||||
map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1);
|
||||
}
|
||||
|
||||
private Cell getOffheapCell(byte [] row, byte [] family, byte [] qualifier) {
|
||||
private static Cell getOffheapCell(byte [] row, byte [] family, byte [] qualifier) {
|
||||
KeyValue kvCell = new KeyValue(row, family, qualifier, 0L, Type.Put, row);
|
||||
ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length);
|
||||
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length);
|
||||
|
|
|
@ -33,7 +33,9 @@ import java.util.stream.Collectors;
|
|||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -200,8 +202,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup "
|
||||
+ request.getTargetGroup());
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup());
|
||||
}
|
||||
checkPermission("moveServers");
|
||||
groupAdminServer.moveServers(hostPorts, request.getTargetGroup());
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveServers(hostPorts, request.getTargetGroup());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
|
@ -219,8 +227,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup "
|
||||
+ request.getTargetGroup());
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup());
|
||||
}
|
||||
checkPermission("moveTables");
|
||||
groupAdminServer.moveTables(tables, request.getTargetGroup());
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
|
@ -233,8 +247,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder();
|
||||
LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName());
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName());
|
||||
}
|
||||
checkPermission("addRSGroup");
|
||||
groupAdminServer.addRSGroup(request.getRSGroupName());
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
|
@ -248,8 +268,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
RemoveRSGroupResponse.newBuilder();
|
||||
LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName());
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName());
|
||||
}
|
||||
checkPermission("removeRSGroup");
|
||||
groupAdminServer.removeRSGroup(request.getRSGroupName());
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
|
@ -263,8 +289,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group=" +
|
||||
request.getRSGroupName());
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName());
|
||||
}
|
||||
checkPermission("balanceRSGroup");
|
||||
builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName()));
|
||||
boolean balancerRan = groupAdminServer.balanceRSGroup(request.getRSGroupName());
|
||||
builder.setBalanceRan(balancerRan);
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(),
|
||||
balancerRan);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
builder.setBalanceRan(false);
|
||||
|
@ -298,9 +332,9 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
+ hp);
|
||||
try {
|
||||
checkPermission("getRSGroupInfoOfServer");
|
||||
RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp);
|
||||
if (RSGroupInfo != null) {
|
||||
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
|
||||
RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp);
|
||||
if (info != null) {
|
||||
builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(info));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
|
@ -323,8 +357,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts
|
||||
+ " and tables " + tables + " to rsgroup" + request.getTargetGroup());
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables,
|
||||
request.getTargetGroup());
|
||||
}
|
||||
checkPermission("moveServersAndTables");
|
||||
groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup());
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables,
|
||||
request.getTargetGroup());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
|
@ -344,8 +386,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
LOG.info(master.getClientIdAuditPrefix()
|
||||
+ " remove decommissioned servers from rsgroup: " + servers);
|
||||
try {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preRemoveServers(servers);
|
||||
}
|
||||
checkPermission("removeServers");
|
||||
groupAdminServer.removeServers(servers);
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postRemoveServers(servers);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
CoprocessorRpcUtils.setControllerException(controller, e);
|
||||
}
|
||||
|
@ -354,12 +402,20 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
|
|||
}
|
||||
|
||||
boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException {
|
||||
String groupName =
|
||||
String groupName;
|
||||
try {
|
||||
groupName =
|
||||
master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString())
|
||||
.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
|
||||
if (groupName == null) {
|
||||
if (groupName == null) {
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
} catch (MasterNotRunningException | PleaseHoldException e) {
|
||||
LOG.info("Master has not initialized yet; temporarily using default RSGroup '" +
|
||||
RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
|
||||
RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
|
||||
if (rsGroupInfo == null) {
|
||||
throw new ConstraintException(
|
||||
|
|
|
@ -291,9 +291,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
// Hold a lock on the manager instance while moving servers to prevent
|
||||
// another writer changing our state while we are working.
|
||||
synchronized (rsGroupInfoManager) {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName);
|
||||
}
|
||||
// Presume first server's source group. Later ensure all servers are from this group.
|
||||
Address firstServer = servers.iterator().next();
|
||||
RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer);
|
||||
|
@ -370,9 +367,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
} while (foundRegionsToMove);
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName);
|
||||
}
|
||||
LOG.info("Move server done: " + srcGrp.getName() + "=>" + targetGroupName);
|
||||
}
|
||||
}
|
||||
|
@ -390,9 +384,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
// Hold a lock on the manager instance while moving servers to prevent
|
||||
// another writer changing our state while we are working.
|
||||
synchronized (rsGroupInfoManager) {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup);
|
||||
}
|
||||
if(targetGroup != null) {
|
||||
RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup);
|
||||
if(destGroup == null) {
|
||||
|
@ -430,22 +421,12 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addRSGroup(String name) throws IOException {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preAddRSGroup(name);
|
||||
}
|
||||
rsGroupInfoManager.addRSGroup(new RSGroupInfo(name));
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postAddRSGroup(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -453,9 +434,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
// Hold a lock on the manager instance while moving servers to prevent
|
||||
// another writer changing our state while we are working.
|
||||
synchronized (rsGroupInfoManager) {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preRemoveRSGroup(name);
|
||||
}
|
||||
RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name);
|
||||
if (rsGroupInfo == null) {
|
||||
throw new ConstraintException("RSGroup " + name + " does not exist");
|
||||
|
@ -480,9 +458,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
}
|
||||
rsGroupInfoManager.removeRSGroup(name);
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postRemoveRSGroup(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -498,9 +473,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preBalanceRSGroup(groupName);
|
||||
}
|
||||
if (getRSGroupInfo(groupName) == null) {
|
||||
throw new ConstraintException("RSGroup does not exist: "+groupName);
|
||||
}
|
||||
|
@ -542,9 +514,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
LOG.info("RSGroup balance " + groupName + " completed after " +
|
||||
(System.currentTimeMillis()-startTime) + " seconds");
|
||||
}
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan);
|
||||
}
|
||||
return balancerRan;
|
||||
}
|
||||
}
|
||||
|
@ -575,9 +544,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
// Hold a lock on the manager instance while moving servers and tables to prevent
|
||||
// another writer changing our state while we are working.
|
||||
synchronized (rsGroupInfoManager) {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, targetGroup);
|
||||
}
|
||||
//check servers and tables status
|
||||
checkServersAndTables(servers, tables, targetGroup);
|
||||
|
||||
|
@ -589,10 +555,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
moveRegionsFromServers(servers, tables, targetGroup);
|
||||
//move regions which should belong to these servers
|
||||
moveRegionsToServers(servers, tables, targetGroup);
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup);
|
||||
}
|
||||
}
|
||||
LOG.info("Move servers and tables done. Severs :"
|
||||
+ servers + " , Tables : " + tables + " => " + targetGroup);
|
||||
|
@ -607,15 +569,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
// Hold a lock on the manager instance while moving servers to prevent
|
||||
// another writer changing our state while we are working.
|
||||
synchronized (rsGroupInfoManager) {
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().preRemoveServers(servers);
|
||||
}
|
||||
//check the set of servers
|
||||
checkForDeadOrOnlineServers(servers);
|
||||
rsGroupInfoManager.removeServers(servers);
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postRemoveServers(servers);
|
||||
}
|
||||
LOG.info("Remove decommissioned servers " + servers + " from rsgroup done.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -200,6 +200,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
for (RegionInfo region : regions.keySet()) {
|
||||
if (!misplacedRegions.contains(region)) {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.info("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
groupToRegion.put(groupName, region);
|
||||
}
|
||||
}
|
||||
|
@ -221,6 +225,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
|
||||
for (RegionInfo region : misplacedRegions) {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.info("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
|
||||
List<ServerName> candidateList = filterOfflineServers(info, servers);
|
||||
ServerName server = this.internalBalancer.randomAssignment(region,
|
||||
|
@ -263,7 +271,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
for (RegionInfo region : regions) {
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.warn("Group for table "+region.getTable()+" is null");
|
||||
LOG.info("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
regionMap.put(groupName, region);
|
||||
}
|
||||
|
@ -322,8 +331,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
for(Map.Entry<RegionInfo, ServerName> region : regions.entrySet()) {
|
||||
RegionInfo regionInfo = region.getKey();
|
||||
ServerName assignedServer = region.getValue();
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(rsGroupInfoManager.
|
||||
getRSGroupOfTable(regionInfo.getTable()));
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.info("Group not found for table " + regionInfo.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
|
||||
if (assignedServer == null) {
|
||||
LOG.debug("There is no assigned server for {}", region);
|
||||
continue;
|
||||
|
@ -358,8 +371,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
for (RegionInfo region : regions) {
|
||||
RSGroupInfo targetRSGInfo = null;
|
||||
try {
|
||||
targetRSGInfo = rsGroupInfoManager.getRSGroup(
|
||||
rsGroupInfoManager.getRSGroupOfTable(region.getTable()));
|
||||
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
|
||||
if (groupName == null) {
|
||||
LOG.info("Group not found for table " + region.getTable() + ", using default");
|
||||
groupName = RSGroupInfo.DEFAULT_GROUP;
|
||||
}
|
||||
targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
|
||||
} catch (IOException exp) {
|
||||
LOG.debug("RSGroup information null for region of table " + region.getTable(),
|
||||
exp);
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.google.protobuf.ServiceException;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -72,10 +71,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
|||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
||||
import org.apache.hadoop.hbase.security.access.AccessControlLists;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -249,7 +245,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
public synchronized void moveTables(Set<TableName> tableNames, String groupName)
|
||||
throws IOException {
|
||||
if (groupName != null && !rsGroupMap.containsKey(groupName)) {
|
||||
throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
|
||||
throw new DoNotRetryIOException("Group "+groupName+" does not exist");
|
||||
}
|
||||
|
||||
Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
|
||||
|
@ -408,18 +404,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
|
|||
for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
|
||||
orphanTables.add(TableName.valueOf(entry));
|
||||
}
|
||||
|
||||
List<TableName> specialTables =
|
||||
new ArrayList<TableName>(Arrays.asList(AccessControlLists.ACL_TABLE_NAME,
|
||||
TableName.META_TABLE_NAME, TableName.NAMESPACE_TABLE_NAME, RSGROUP_TABLE_NAME));
|
||||
// if quota is enabled, add corresponding system table to special tables list
|
||||
if (QuotaUtil.isQuotaEnabled(conn.getConfiguration())) {
|
||||
specialTables.add(QuotaTableUtil.QUOTA_TABLE_NAME);
|
||||
}
|
||||
|
||||
for (TableName table : specialTables) {
|
||||
orphanTables.add(table);
|
||||
}
|
||||
for (RSGroupInfo group: groupList) {
|
||||
if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
|
||||
orphanTables.removeAll(group.getTables());
|
||||
|
|
|
@ -23,6 +23,9 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -36,6 +39,11 @@ import org.apache.hadoop.hbase.Waiter;
|
|||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
|
@ -69,7 +77,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroups.class);
|
||||
private static boolean INIT = false;
|
||||
private static RSGroupAdminEndpoint rsGroupAdminEndpoint;
|
||||
|
||||
private static CPMasterObserver observer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
|
@ -80,7 +88,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
|
||||
RSGroupBasedLoadBalancer.class.getName());
|
||||
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
RSGroupAdminEndpoint.class.getName());
|
||||
RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
|
||||
// Enable quota for testRSGroupsWithHBaseQuota()
|
||||
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
|
||||
|
@ -104,8 +112,10 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
admin.setBalancerRunning(false,true);
|
||||
rsGroupAdmin = new VerifyingRSGroupAdminClient(
|
||||
new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration());
|
||||
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
|
||||
observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName());
|
||||
rsGroupAdminEndpoint = (RSGroupAdminEndpoint)
|
||||
master.getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName());
|
||||
host.findCoprocessor(RSGroupAdminEndpoint.class.getName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -148,6 +158,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
} catch (Exception ex) {
|
||||
LOG.warn("Got this on setup, FYI", ex);
|
||||
}
|
||||
assertTrue(observer.preMoveServersCalled);
|
||||
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
|
@ -221,6 +232,9 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
String groupName = tablePrefix+"_foo";
|
||||
LOG.info("testNamespaceConstraint");
|
||||
rsGroupAdmin.addRSGroup(groupName);
|
||||
assertTrue(observer.preAddRSGroupCalled);
|
||||
assertTrue(observer.postAddRSGroupCalled);
|
||||
|
||||
admin.createNamespace(NamespaceDescriptor.create(nsName)
|
||||
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
|
||||
.build());
|
||||
|
@ -241,6 +255,8 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
//test add non-existent group
|
||||
admin.deleteNamespace(nsName);
|
||||
rsGroupAdmin.removeRSGroup(groupName);
|
||||
assertTrue(observer.preRemoveRSGroupCalled);
|
||||
assertTrue(observer.postRemoveRSGroupCalled);
|
||||
try {
|
||||
admin.createNamespace(NamespaceDescriptor.create(nsName)
|
||||
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo")
|
||||
|
@ -261,6 +277,120 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
it.next();
|
||||
}
|
||||
|
||||
public static class CPMasterObserver implements MasterCoprocessor, MasterObserver {
|
||||
boolean preBalanceRSGroupCalled = false;
|
||||
boolean postBalanceRSGroupCalled = false;
|
||||
boolean preMoveServersCalled = false;
|
||||
boolean postMoveServersCalled = false;
|
||||
boolean preMoveTablesCalled = false;
|
||||
boolean postMoveTablesCalled = false;
|
||||
boolean preAddRSGroupCalled = false;
|
||||
boolean postAddRSGroupCalled = false;
|
||||
boolean preRemoveRSGroupCalled = false;
|
||||
boolean postRemoveRSGroupCalled = false;
|
||||
boolean preRemoveServersCalled = false;
|
||||
boolean postRemoveServersCalled = false;
|
||||
boolean preMoveServersAndTables = false;
|
||||
boolean postMoveServersAndTables = false;
|
||||
|
||||
@Override
|
||||
public Optional<MasterObserver> getMasterObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
@Override
|
||||
public void preMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
|
||||
preMoveServersAndTables = true;
|
||||
}
|
||||
@Override
|
||||
public void postMoveServersAndTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers, Set<TableName> tables, String targetGroup) throws IOException {
|
||||
postMoveServersAndTables = true;
|
||||
}
|
||||
@Override
|
||||
public void preRemoveServers(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
preRemoveServersCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postRemoveServers(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers) throws IOException {
|
||||
postRemoveServersCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException {
|
||||
preRemoveRSGroupCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postRemoveRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException {
|
||||
postRemoveRSGroupCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preAddRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException {
|
||||
preAddRSGroupCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postAddRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String name) throws IOException {
|
||||
postAddRSGroupCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preMoveTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<TableName> tables, String targetGroup) throws IOException {
|
||||
preMoveTablesCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postMoveTables(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<TableName> tables, String targetGroup) throws IOException {
|
||||
postMoveTablesCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers, String targetGroup) throws IOException {
|
||||
preMoveServersCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postMoveServers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
Set<Address> servers, String targetGroup) throws IOException {
|
||||
postMoveServersCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void preBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String groupName) throws IOException {
|
||||
preBalanceRSGroupCalled = true;
|
||||
}
|
||||
@Override
|
||||
public void postBalanceRSGroup(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
String groupName, boolean balancerRan) throws IOException {
|
||||
postBalanceRSGroupCalled = true;
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void testMoveServersAndTables() throws Exception {
|
||||
super.testMoveServersAndTables();
|
||||
assertTrue(observer.preMoveServersAndTables);
|
||||
assertTrue(observer.postMoveServersAndTables);
|
||||
}
|
||||
@Test
|
||||
public void testTableMoveTruncateAndDrop() throws Exception {
|
||||
super.testTableMoveTruncateAndDrop();
|
||||
assertTrue(observer.preMoveTablesCalled);
|
||||
assertTrue(observer.postMoveTablesCalled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveServers() throws Exception {
|
||||
super.testRemoveServers();
|
||||
assertTrue(observer.preRemoveServersCalled);
|
||||
assertTrue(observer.postRemoveServersCalled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMisplacedRegions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
|
||||
|
@ -277,6 +407,8 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
admin.setBalancerRunning(true,true);
|
||||
assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName()));
|
||||
admin.setBalancerRunning(false,true);
|
||||
assertTrue(observer.preBalanceRSGroupCalled);
|
||||
assertTrue(observer.postBalanceRSGroupCalled);
|
||||
|
||||
TEST_UTIL.waitFor(60000, new Predicate<Exception>() {
|
||||
@Override
|
||||
|
|
|
@ -332,8 +332,8 @@ public class HFile {
|
|||
try {
|
||||
ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
|
||||
} catch (UnsupportedOperationException uoe) {
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Unable to set drop behind on " + path, uoe);
|
||||
else if (LOG.isDebugEnabled()) LOG.debug("Unable to set drop behind on " + path);
|
||||
LOG.trace("Unable to set drop behind on {}", path, uoe);
|
||||
LOG.debug("Unable to set drop behind on {}", path.getName());
|
||||
}
|
||||
}
|
||||
return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, fileContext);
|
||||
|
|
|
@ -53,7 +53,7 @@ public class CellSet implements NavigableSet<Cell> {
|
|||
private final int numUniqueKeys;
|
||||
|
||||
CellSet(final CellComparator c) {
|
||||
this.delegatee = new ConcurrentSkipListMap<>(c);
|
||||
this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
|
||||
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
|
|||
@InterfaceAudience.Private
|
||||
public class ChunkCreator {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ChunkCreator.class);
|
||||
// monotonically increasing chunkid
|
||||
// monotonically increasing chunkid. Starts at 1.
|
||||
private AtomicInteger chunkID = new AtomicInteger(1);
|
||||
// maps the chunk against the monotonically increasing chunk id. We need to preserve the
|
||||
// natural ordering of the key
|
||||
|
|
|
@ -2552,8 +2552,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
|
||||
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," +
|
||||
" memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) +
|
||||
" memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) +
|
||||
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
|
||||
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
|
||||
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
|
||||
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
|
||||
}
|
||||
|
@ -2753,10 +2753,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
long time = EnvironmentEdgeManager.currentTime() - startTime;
|
||||
MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
|
||||
long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize();
|
||||
String msg = "Finished memstore flush;"
|
||||
+ " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
|
||||
+ ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
|
||||
+ ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
|
||||
String msg = "Finished flush of"
|
||||
+ " dataSize ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
|
||||
+ ", heapSize ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
|
||||
+ ", currentSize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
|
||||
+ " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid="
|
||||
+ flushOpSeqId + ", compaction requested=" + compactionRequested
|
||||
+ ((wal == null) ? "; wal=null" : "");
|
||||
|
@ -4236,7 +4236,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @param cellItr
|
||||
* @param now
|
||||
*/
|
||||
public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
|
||||
private static void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
|
||||
throws IOException {
|
||||
for (List<Cell> cells: cellItr) {
|
||||
if (cells == null) continue;
|
||||
|
@ -4291,12 +4291,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
requestFlush();
|
||||
// Don't print current limit because it will vary too much. The message is used as a key
|
||||
// over in RetriesExhaustedWithDetailsException processing.
|
||||
throw new RegionTooBusyException("Over memstore limit; regionName=" +
|
||||
throw new RegionTooBusyException("Over memstore limit=" +
|
||||
org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) +
|
||||
", regionName=" +
|
||||
(this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) +
|
||||
", server=" + (this.getRegionServerServices() == null ? "unknown":
|
||||
this.getRegionServerServices().getServerName()) +
|
||||
", blockingMemStoreSize=" +
|
||||
org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(blockingMemStoreSize));
|
||||
", server=" + (this.getRegionServerServices() == null? "unknown":
|
||||
this.getRegionServerServices().getServerName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8602,4 +8602,5 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public void requestFlush(FlushLifeCycleTracker tracker) throws IOException {
|
||||
requestFlush0(tracker);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -120,12 +120,13 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
*/
|
||||
@Override
|
||||
public Cell forceCopyOfBigCellInto(Cell cell) {
|
||||
int size = KeyValueUtil.length(cell) + ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||
int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
|
||||
KeyValueUtil.length(cell);
|
||||
size += ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
if (size <= dataChunkSize) {
|
||||
// Using copyCellInto for cells which are bigger than the original maxAlloc
|
||||
Cell newCell = copyCellInto(cell, dataChunkSize);
|
||||
return newCell;
|
||||
return copyCellInto(cell, dataChunkSize);
|
||||
} else {
|
||||
Chunk c = getNewExternalChunk(size);
|
||||
int allocOffset = c.alloc(size);
|
||||
|
@ -134,7 +135,8 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
}
|
||||
|
||||
private Cell copyCellInto(Cell cell, int maxAlloc) {
|
||||
int size = KeyValueUtil.length(cell);
|
||||
int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
|
||||
KeyValueUtil.length(cell);
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
// Callers should satisfy large allocations directly from JVM since they
|
||||
// don't cause fragmentation as badly.
|
||||
|
@ -169,7 +171,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
* Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
|
||||
* out of it
|
||||
*/
|
||||
private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
|
||||
private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
|
||||
int tagsLen = cell.getTagsLength();
|
||||
if (cell instanceof ExtendedCell) {
|
||||
((ExtendedCell) cell).write(buf, offset);
|
||||
|
@ -255,8 +257,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
|||
*/
|
||||
private Chunk getOrMakeChunk() {
|
||||
// Try to get the chunk
|
||||
Chunk c;
|
||||
c = currChunk.get();
|
||||
Chunk c = currChunk.get();
|
||||
if (c != null) {
|
||||
return c;
|
||||
}
|
||||
|
|
|
@ -43,21 +43,22 @@ import org.slf4j.LoggerFactory;
|
|||
* target cluster is an HBase cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||
justification="Thinks zkw needs to be synchronized access but should be fine as is.")
|
||||
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
implements Abortable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
|
||||
|
||||
private ZKWatcher zkw = null; // FindBugs: MT_CORRECTNESS
|
||||
private Object zkwLock = new Object();
|
||||
private ZKWatcher zkw = null;
|
||||
|
||||
private List<ServerName> regionServers = new ArrayList<>(0);
|
||||
private long lastRegionServerUpdate;
|
||||
|
||||
protected void disconnect() {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
synchronized (zkwLock) {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,7 +113,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
public synchronized UUID getPeerUUID() {
|
||||
UUID peerUUID = null;
|
||||
try {
|
||||
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
|
||||
synchronized (zkwLock) {
|
||||
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
reconnect(ke);
|
||||
}
|
||||
|
@ -124,7 +127,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
* @return zk connection
|
||||
*/
|
||||
protected ZKWatcher getZkw() {
|
||||
return zkw;
|
||||
synchronized (zkwLock) {
|
||||
return zkw;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -132,10 +137,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
* @throws IOException If anything goes wrong connecting
|
||||
*/
|
||||
void reloadZkWatcher() throws IOException {
|
||||
if (zkw != null) zkw.close();
|
||||
zkw = new ZKWatcher(ctx.getConfiguration(),
|
||||
synchronized (zkwLock) {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
zkw = new ZKWatcher(ctx.getConfiguration(),
|
||||
"connection to cluster: " + ctx.getPeerId(), this);
|
||||
getZkw().registerListener(new PeerRegionServerListener(this));
|
||||
zkw.registerListener(new PeerRegionServerListener(this));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,13 +182,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
* for this peer cluster
|
||||
* @return list of addresses
|
||||
*/
|
||||
// Synchronize peer cluster connection attempts to avoid races and rate
|
||||
// limit connections when multiple replication sources try to connect to
|
||||
// the peer cluster. If the peer cluster is down we can get out of control
|
||||
// over time.
|
||||
public synchronized List<ServerName> getRegionServers() {
|
||||
public List<ServerName> getRegionServers() {
|
||||
try {
|
||||
setRegionServers(fetchSlavesAddresses(this.getZkw()));
|
||||
// Synchronize peer cluster connection attempts to avoid races and rate
|
||||
// limit connections when multiple replication sources try to connect to
|
||||
// the peer cluster. If the peer cluster is down we can get out of control
|
||||
// over time.
|
||||
synchronized (zkwLock) {
|
||||
setRegionServers(fetchSlavesAddresses(zkw));
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fetch slaves addresses failed", ke);
|
||||
|
|
|
@ -47,9 +47,11 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
* implementations:
|
||||
* <ul>
|
||||
* <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
|
||||
* "filesystem"</li>
|
||||
* "asyncfs"</li>
|
||||
* <li><em>asyncfs</em> : a provider that will run on top of an implementation of the Hadoop
|
||||
* FileSystem interface via an asynchronous client.</li>
|
||||
* <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
|
||||
* FileSystem interface, normally HDFS.</li>
|
||||
* FileSystem interface via HDFS's synchronous DFSClient.</li>
|
||||
* <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
|
||||
* server.</li>
|
||||
* </ul>
|
||||
|
|
|
@ -47,9 +47,10 @@ public class TestTagRewriteCell {
|
|||
// VisibilityController and AccessController)
|
||||
Cell trCell2 = PrivateCellUtil.createCell(trCell, new byte[fakeTagArrayLength]);
|
||||
|
||||
assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be larger than a " +
|
||||
"single TagRewriteCell's heapsize", trCellHeapSize < ((HeapSize)trCell2).heapSize());
|
||||
assertTrue("TagRewriteCell should have had nulled out tags array", ((HeapSize)trCell).heapSize() <
|
||||
trCellHeapSize);
|
||||
assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be " +
|
||||
"larger than a single TagRewriteCell's heapsize",
|
||||
trCellHeapSize < ((HeapSize)trCell2).heapSize());
|
||||
assertTrue("TagRewriteCell should have had nulled out tags array",
|
||||
((HeapSize)trCell).heapSize() < trCellHeapSize);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue