diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java
new file mode 100644
index 00000000000..c15d32110ad
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A comparator for case where {@link ByteBufferKeyValue} is prevalent type (BBKV
+ * is base-type in hbase2). Takes a general comparator as fallback in case types are NOT the
+ * expected ByteBufferKeyValue.
+ *
+ *
This is a tricked-out Comparator at heart of hbase read and write. It is in
+ * the HOT path so we try all sorts of ugly stuff so we can go faster. See below
+ * in this javadoc comment for the list.
+ *
+ *
Apply this comparator narrowly so it is fed exclusively ByteBufferKeyValues
+ * as much as is possible so JIT can settle (e.g. make one per ConcurrentSkipListMap
+ * in HStore).
+ *
+ *
Exploits specially added methods in BBKV to save on deserializations of shorts,
+ * longs, etc: i.e. calculating the family length requires row length; pass it in
+ * rather than recalculate it, and so on.
+ *
+ *
This comparator does static dispatch to private final methods so hotspot is comfortable
+ * deciding inline.
+ *
+ *
Measurement has it that we almost have it so all inlines from memstore
+ * ConcurrentSkipListMap on down to the (unsafe) intrinisics that do byte compare
+ * and deserialize shorts and ints; needs a bit more work.
+ *
+ *
Does not take a Type to compare: i.e. it is not a Comparator<Cell> or
+ * CellComparator<Cell> or Comparator<ByteBufferKeyValue> because that adds
+ * another method to the hierarchy -- from compare(Object, Object)
+ * to dynamic compare(Cell, Cell) to static private compare -- and inlining doesn't happen if
+ * hierarchy is too deep (it is the case here).
+ *
+ *
Be careful making changes. Compare perf before and after and look at what
+ * hotspot ends up generating before committing change (jitwatch is helpful here).
+ * Changing this one class doubled write throughput (HBASE-20483).
+ */
+@InterfaceAudience.Private
+public class BBKVComparator implements Comparator {
+ protected static final Logger LOG = LoggerFactory.getLogger(BBKVComparator.class);
+ private final Comparator fallback;
+
+ public BBKVComparator(Comparator fallback) {
+ this.fallback = fallback;
+ }
+
+ @Override
+ public int compare(Object l, Object r) {
+ // LOG.info("ltype={} rtype={}", l, r);
+ if ((l instanceof ByteBufferKeyValue) && (r instanceof ByteBufferKeyValue)) {
+ return compare((ByteBufferKeyValue)l, (ByteBufferKeyValue)r);
+ }
+ // Skip calling compare(Object, Object) and go direct to compare(Cell, Cell)
+ return this.fallback.compare((Cell)l, (Cell)r);
+ }
+
+ // TODO: Come back here. We get a few percentage points extra of throughput if this is a
+ // private method.
+ static final int compare(ByteBufferKeyValue left, ByteBufferKeyValue right) {
+ // NOTE: Same method is in CellComparatorImpl, also private, not shared, intentionally. Not
+ // sharing gets us a few percent more throughput in compares. If changes here or there, make
+ // sure done in both places.
+
+ // Compare Rows. Cache row length.
+ int leftRowLength = left.getRowLength();
+ int rightRowLength = right.getRowLength();
+ int diff = ByteBufferUtils.compareTo(left.getRowByteBuffer(), left.getRowPosition(),
+ leftRowLength,
+ right.getRowByteBuffer(), right.getRowPosition(), rightRowLength);
+ if (diff != 0) {
+ return diff;
+ }
+
+ // If the column is not specified, the "minimum" key type appears as latest in the sorted
+ // order, regardless of the timestamp. This is used for specifying the last key/value in a
+ // given row, because there is no "lexicographically last column" (it would be infinitely long).
+ // The "maximum" key type does not need this behavior. Copied from KeyValue. This is bad in that
+ // we can't do memcmp w/ special rules like this.
+ // TODO: Is there a test for this behavior?
+ int leftFamilyLengthPosition = left.getFamilyLengthPosition(leftRowLength);
+ int leftFamilyLength = left.getFamilyLength(leftFamilyLengthPosition);
+ int leftKeyLength = left.getKeyLength();
+ int leftQualifierLength = left.getQualifierLength(leftKeyLength, leftRowLength,
+ leftFamilyLength);
+
+ // No need of left row length below here.
+
+ byte leftType = left.getTypeByte(leftKeyLength);
+ if (leftFamilyLength + leftQualifierLength == 0 &&
+ leftType == KeyValue.Type.Minimum.getCode()) {
+ // left is "bigger", i.e. it appears later in the sorted order
+ return 1;
+ }
+
+ int rightFamilyLengthPosition = right.getFamilyLengthPosition(rightRowLength);
+ int rightFamilyLength = right.getFamilyLength(rightFamilyLengthPosition);
+ int rightKeyLength = right.getKeyLength();
+ int rightQualifierLength = right.getQualifierLength(rightKeyLength, rightRowLength,
+ rightFamilyLength);
+
+ // No need of right row length below here.
+
+ byte rightType = right.getTypeByte(rightKeyLength);
+ if (rightFamilyLength + rightQualifierLength == 0 &&
+ rightType == KeyValue.Type.Minimum.getCode()) {
+ return -1;
+ }
+
+ // Compare families.
+ int leftFamilyPosition = left.getFamilyPosition(leftFamilyLengthPosition);
+ int rightFamilyPosition = right.getFamilyPosition(rightFamilyLengthPosition);
+ diff = ByteBufferUtils.compareTo(left.getFamilyByteBuffer(), leftFamilyPosition,
+ leftFamilyLength,
+ right.getFamilyByteBuffer(), rightFamilyPosition, rightFamilyLength);
+ if (diff != 0) {
+ return diff;
+ }
+
+ // Compare qualifiers
+ diff = ByteBufferUtils.compareTo(left.getQualifierByteBuffer(),
+ left.getQualifierPosition(leftFamilyPosition, leftFamilyLength), leftQualifierLength,
+ right.getQualifierByteBuffer(),
+ right.getQualifierPosition(rightFamilyPosition, rightFamilyLength),
+ rightQualifierLength);
+ if (diff != 0) {
+ return diff;
+ }
+
+ // Timestamps.
+ // Swap order we pass into compare so we get DESCENDING order.
+ diff = Long.compare(right.getTimestamp(rightKeyLength), left.getTimestamp(leftKeyLength));
+ if (diff != 0) {
+ return diff;
+ }
+
+ // Compare types. Let the delete types sort ahead of puts; i.e. types
+ // of higher numbers sort before those of lesser numbers. Maximum (255)
+ // appears ahead of everything, and minimum (0) appears after
+ // everything.
+ diff = (0xff & rightType) - (0xff & leftType);
+ if (diff != 0) {
+ return diff;
+ }
+
+ // Negate following comparisons so later edits show up first mvccVersion: later sorts first
+ return Longs.compare(right.getSequenceId(), left.getSequenceId());
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index 60be67082bc..3529d54f7df 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -21,6 +21,7 @@ import java.util.Comparator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
+
/**
* Comparator for comparing cells and has some specialized methods that allows comparing individual
* cell components like row, family, qualifier and timestamp
@@ -130,4 +131,11 @@ public interface CellComparator extends Comparator {
* timestamp 0 if both timestamps are equal
*/
int compareTimestamps(long leftCellts, long rightCellts);
+
+ /**
+ * @return A dumbed-down, fast comparator for hbase2 base-type, the {@link ByteBufferKeyValue}.
+ * Create an instance when you make a new memstore, when you know only BBKVs will be passed.
+ * Do not pollute with types other than BBKV if can be helped; the Comparator will slow.
+ */
+ Comparator getSimpleComparator();
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java
index 785d8ffb1c3..c2f2ea55d52 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java
@@ -18,15 +18,18 @@
package org.apache.hadoop.hbase;
+import java.util.Comparator;
+
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
+
/**
* Compare two HBase cells. Do not use this method comparing -ROOT- or
@@ -34,9 +37,13 @@ import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;
* takes account of the special formatting of the row where we have commas to delimit table from
* regionname, from row. See KeyValue for how it has a special comparator to do hbase:meta cells
* and yet another for -ROOT-.
- * While using this comparator for {{@link #compareRows(Cell, Cell)} et al, the hbase:meta cells
+ * While using this comparator for {{@link #compareRows(Cell, Cell)} et al, the hbase:meta cells
* format should be taken into consideration, for which the instance of this comparator
* should be used. In all other cases the static APIs in this comparator would be enough
+ * HOT methods. We spend a good portion of CPU comparing. Anything that makes the compare
+ * faster will likely manifest at the macro level. See also
+ * {@link BBKVComparator}. Use it when mostly {@link ByteBufferKeyValue}s.
+ *
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="UNKNOWN",
@@ -57,21 +64,17 @@ public class CellComparatorImpl implements CellComparator {
public static final CellComparatorImpl META_COMPARATOR = new MetaCellComparator();
@Override
- public int compare(Cell a, Cell b) {
+ public final int compare(final Cell a, final Cell b) {
return compare(a, b, false);
}
- /**
- * Compare cells.
- * @param ignoreSequenceid True if we are to compare the key portion only and ignore
- * the sequenceid. Set to false to compare key and consider sequenceid.
- * @return 0 if equal, -1 if a < b, and +1 if a > b.
- */
@Override
public int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
+
int diff = 0;
+ // "Peel off" the most common path.
if (a instanceof ByteBufferKeyValue && b instanceof ByteBufferKeyValue) {
- diff = compareByteBufferKeyValue((ByteBufferKeyValue)a, (ByteBufferKeyValue)b);
+ diff = BBKVComparator.compare((ByteBufferKeyValue)a, (ByteBufferKeyValue)b);
if (diff != 0) {
return diff;
}
@@ -88,82 +91,7 @@ public class CellComparatorImpl implements CellComparator {
}
// Negate following comparisons so later edits show up first mvccVersion: later sorts first
- return ignoreSequenceid? diff: Longs.compare(b.getSequenceId(), a.getSequenceId());
- }
-
- /**
- * Specialized comparator for the ByteBufferKeyValue type exclusivesly.
- * Caches deserialized lengths of rows and families, etc., and reuses them where it can
- * (ByteBufferKeyValue has been changed to be amenable to our providing pre-made lengths, etc.)
- */
- private static final int compareByteBufferKeyValue(ByteBufferKeyValue left,
- ByteBufferKeyValue right) {
- // Compare Rows. Cache row length.
- int leftRowLength = left.getRowLength();
- int rightRowLength = right.getRowLength();
- int diff = ByteBufferUtils.compareTo(
- left.getRowByteBuffer(), left.getRowPosition(), leftRowLength,
- right.getRowByteBuffer(), right.getRowPosition(), rightRowLength);
- if (diff != 0) {
- return diff;
- }
-
- // If the column is not specified, the "minimum" key type appears the
- // latest in the sorted order, regardless of the timestamp. This is used
- // for specifying the last key/value in a given row, because there is no
- // "lexicographically last column" (it would be infinitely long). The
- // "maximum" key type does not need this behavior.
- // Copied from KeyValue. This is bad in that we can't do memcmp w/ special rules like this.
- // I tried to get rid of the above but scanners depend on it. TODO.
- int leftFamilyLengthPosition = left.getFamilyLengthPosition(leftRowLength);
- int leftFamilyLength = left.getFamilyLength(leftFamilyLengthPosition);
- int rightFamilyLengthPosition = right.getFamilyLengthPosition(rightRowLength);
- int rightFamilyLength = right.getFamilyLength(rightFamilyLengthPosition);
- int leftKeyLength = left.getKeyLength();
- int leftQualifierLength = left.getQualifierLength(leftKeyLength, leftRowLength,
- leftFamilyLength);
- byte leftType = left.getTypeByte(leftKeyLength);
- if (leftFamilyLength + leftQualifierLength == 0 && leftType == Type.Minimum.getCode()) {
- // left is "bigger", i.e. it appears later in the sorted order
- return 1;
- }
- int rightKeyLength = right.getKeyLength();
- int rightQualifierLength = right.getQualifierLength(rightKeyLength, rightRowLength,
- rightFamilyLength);
- byte rightType = right.getTypeByte(rightKeyLength);
- if (rightFamilyLength + rightQualifierLength == 0 && rightType == Type.Minimum.getCode()) {
- return -1;
- }
-
- // Compare families.
- int leftFamilyPosition = left.getFamilyPosition(leftFamilyLengthPosition);
- int rightFamilyPosition = right.getFamilyPosition(rightFamilyLengthPosition);
- diff = ByteBufferUtils.compareTo(
- left.getFamilyByteBuffer(), leftFamilyPosition, leftFamilyLength,
- right.getFamilyByteBuffer(), rightFamilyPosition, rightFamilyLength);
- if (diff != 0) {
- return diff;
- }
- // Compare qualifiers
- diff = ByteBufferUtils.compareTo(left.getQualifierByteBuffer(),
- left.getQualifierPosition(leftFamilyPosition, leftFamilyLength), leftQualifierLength,
- right.getQualifierByteBuffer(),
- right.getQualifierPosition(rightFamilyPosition, rightFamilyLength),
- rightQualifierLength);
- if (diff != 0) {
- return diff;
- }
- // Timestamps.
- diff = compareTimestampsInternal(left.getTimestamp(leftKeyLength),
- right.getTimestamp(rightKeyLength));
- if (diff != 0) {
- return diff;
- }
- // Compare types. Let the delete types sort ahead of puts; i.e. types
- // of higher numbers sort before those of lesser numbers. Maximum (255)
- // appears ahead of everything, and minimum (0) appears after
- // everything.
- return (0xff & rightType) - (0xff & leftType);
+ return ignoreSequenceid? diff: Long.compare(b.getSequenceId(), a.getSequenceId());
}
/**
@@ -254,7 +182,7 @@ public class CellComparatorImpl implements CellComparator {
return compareRows(left, left.getRowLength(), right, right.getRowLength());
}
- int compareRows(final Cell left, int leftRowLength, final Cell right, int rightRowLength) {
+ static int compareRows(final Cell left, int leftRowLength, final Cell right, int rightRowLength) {
// left and right can be exactly the same at the beginning of a row
if (left == right) {
return 0;
@@ -341,7 +269,7 @@ public class CellComparatorImpl implements CellComparator {
return diff;
}
- diff = compareTimestamps(left, right);
+ diff = compareTimestamps(left.getTimestamp(), right.getTimestamp());
if (diff != 0) {
return diff;
}
@@ -355,16 +283,12 @@ public class CellComparatorImpl implements CellComparator {
@Override
public int compareTimestamps(final Cell left, final Cell right) {
- return compareTimestampsInternal(left.getTimestamp(), right.getTimestamp());
+ return compareTimestamps(left.getTimestamp(), right.getTimestamp());
}
@Override
public int compareTimestamps(final long ltimestamp, final long rtimestamp) {
- return compareTimestampsInternal(ltimestamp, rtimestamp);
- }
-
- private static final int compareTimestampsInternal(final long ltimestamp, final long rtimestamp) {
- // Swap the times so sort is newest to oldest, descending.
+ // Swap order we pass into compare so we get DESCENDING order.
return Long.compare(rtimestamp, ltimestamp);
}
@@ -373,6 +297,7 @@ public class CellComparatorImpl implements CellComparator {
* {@link KeyValue}s.
*/
public static class MetaCellComparator extends CellComparatorImpl {
+ // TODO: Do we need a ByteBufferKeyValue version of this?
@Override
public int compareRows(final Cell left, final Cell right) {
return compareRows(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
@@ -451,5 +376,15 @@ public class CellComparatorImpl implements CellComparator {
right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
return result;
}
+
+ @Override
+ public Comparator getSimpleComparator() {
+ return this;
+ }
+ }
+
+ @Override
+ public Comparator getSimpleComparator() {
+ return new BBKVComparator(this);
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
index 07b0e3f7986..8efe88b8e0c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
@@ -30,9 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience;
* must implement this.
*/
@InterfaceAudience.Private
-public interface ExtendedCell extends RawCell, HeapSize, Cloneable {
-
+public interface ExtendedCell extends RawCell, HeapSize {
int CELL_NOT_BASED_ON_CHUNK = -1;
+
/**
* Write this cell to an OutputStream in a {@link KeyValue} format.
* KeyValue format
@@ -87,6 +87,13 @@ public interface ExtendedCell extends RawCell, HeapSize, Cloneable {
getValueLength(), getTagsLength(), withTags);
}
+ /**
+ * @return Serialized size (defaults to include tag length).
+ */
+ default int getSerializedSize() {
+ return getSerializedSize(true);
+ }
+
/**
* Write this Cell into the given buf's offset in a {@link KeyValue} format.
* @param buf The buffer where to write the Cell.
@@ -108,7 +115,8 @@ public interface ExtendedCell extends RawCell, HeapSize, Cloneable {
/**
* Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
* chunks as in case of MemstoreLAB
- * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
+ * @return the chunk id if the cell is backed by fixed sized Chunks, else return
+ * {@link #CELL_NOT_BASED_ON_CHUNK}; i.e. -1.
*/
default int getChunkId() {
return CELL_NOT_BASED_ON_CHUNK;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java
index 7093b4b2dcf..e47702389f1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class IndividualBytesFieldCell implements ExtendedCell {
+public class IndividualBytesFieldCell implements ExtendedCell, Cloneable {
private static final long FIXED_OVERHEAD = ClassSize.align( // do alignment(padding gap)
ClassSize.OBJECT // object header
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index f3bfbd3fd02..f7f6c0ded46 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -76,7 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* length and actual tag bytes length.
*/
@InterfaceAudience.Private
-public class KeyValue implements ExtendedCell {
+public class KeyValue implements ExtendedCell, Cloneable {
private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList<>();
private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index 2246002eea4..11745a4d96d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -50,8 +50,7 @@ public final class ByteBufferUtils {
public final static int NEXT_BIT_MASK = 1 << 7;
@VisibleForTesting
final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
- @VisibleForTesting
- final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+ public final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
private ByteBufferUtils() {
}
@@ -630,6 +629,8 @@ public final class ByteBufferUtils {
}
public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+ // NOTE: This method is copied over in BBKVComparator!!!!! For perf reasons. If you make
+ // changes here, make them there too!!!!
if (UNSAFE_UNALIGNED) {
long offset1Adj, offset2Adj;
Object refObj1 = null, refObj2 = null;
@@ -671,9 +672,12 @@ public final class ByteBufferUtils {
return compareTo(buf1, o1, l1, buf2, o2, l2) == 0;
}
+ // The below two methods show up in lots of places. Versions of them in commons util and in
+ // Cassandra. In guava too? They are copied from ByteBufferUtils. They are here as static
+ // privates. Seems to make code smaller and make Hotspot happier (comes of compares and study
+ // of compiled code via jitwatch).
+
public static int compareTo(byte [] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
- // This method is nearly same as the compareTo that follows but hard sharing code given
- // byte array and bytebuffer types and this is a hot code path
if (UNSAFE_UNALIGNED) {
long offset2Adj;
Object refObj2 = null;
@@ -738,7 +742,7 @@ public final class ByteBufferUtils {
long lw = UnsafeAccess.theUnsafe.getLong(obj1, o1 + (long) i);
long rw = UnsafeAccess.theUnsafe.getLong(obj2, o2 + (long) i);
if (lw != rw) {
- if (!UnsafeAccess.littleEndian) {
+ if (!UnsafeAccess.LITTLE_ENDIAN) {
return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index 6eb09c11df4..15facea3071 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -1549,7 +1549,7 @@ public class Bytes implements Comparable {
long lw = theUnsafe.getLong(buffer1, offset1Adj + i);
long rw = theUnsafe.getLong(buffer2, offset2Adj + i);
if (lw != rw) {
- if(!UnsafeAccess.littleEndian) {
+ if(!UnsafeAccess.LITTLE_ENDIAN) {
return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index 486f81beb6d..953ad5b533e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -42,7 +42,7 @@ public final class UnsafeAccess {
/** The offset to the first element in a byte array. */
public static final long BYTE_ARRAY_BASE_OFFSET;
- static final boolean littleEndian = ByteOrder.nativeOrder()
+ public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder()
.equals(ByteOrder.LITTLE_ENDIAN);
// This number limits the number of bytes to copy per call to Unsafe's
@@ -81,7 +81,7 @@ public final class UnsafeAccess {
* @return the short value
*/
public static short toShort(byte[] bytes, int offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Short.reverseBytes(theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
} else {
return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
@@ -95,7 +95,7 @@ public final class UnsafeAccess {
* @return the int value
*/
public static int toInt(byte[] bytes, int offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
} else {
return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
@@ -109,7 +109,7 @@ public final class UnsafeAccess {
* @return the long value
*/
public static long toLong(byte[] bytes, int offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Long.reverseBytes(theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
} else {
return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
@@ -125,7 +125,7 @@ public final class UnsafeAccess {
* @return incremented offset
*/
public static int putShort(byte[] bytes, int offset, short val) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
val = Short.reverseBytes(val);
}
theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
@@ -140,7 +140,7 @@ public final class UnsafeAccess {
* @return incremented offset
*/
public static int putInt(byte[] bytes, int offset, int val) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
val = Integer.reverseBytes(val);
}
theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
@@ -155,7 +155,7 @@ public final class UnsafeAccess {
* @return incremented offset
*/
public static int putLong(byte[] bytes, int offset, long val) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
val = Long.reverseBytes(val);
}
theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
@@ -172,7 +172,7 @@ public final class UnsafeAccess {
* @return short value at offset
*/
public static short toShort(ByteBuffer buf, int offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Short.reverseBytes(getAsShort(buf, offset));
}
return getAsShort(buf, offset);
@@ -186,7 +186,7 @@ public final class UnsafeAccess {
* @return short value at offset
*/
public static short toShort(Object ref, long offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Short.reverseBytes(theUnsafe.getShort(ref, offset));
}
return theUnsafe.getShort(ref, offset);
@@ -214,7 +214,7 @@ public final class UnsafeAccess {
* @return int value at offset
*/
public static int toInt(ByteBuffer buf, int offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Integer.reverseBytes(getAsInt(buf, offset));
}
return getAsInt(buf, offset);
@@ -228,7 +228,7 @@ public final class UnsafeAccess {
* @return int value at offset
*/
public static int toInt(Object ref, long offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Integer.reverseBytes(theUnsafe.getInt(ref, offset));
}
return theUnsafe.getInt(ref, offset);
@@ -256,7 +256,7 @@ public final class UnsafeAccess {
* @return long value at offset
*/
public static long toLong(ByteBuffer buf, int offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Long.reverseBytes(getAsLong(buf, offset));
}
return getAsLong(buf, offset);
@@ -270,7 +270,7 @@ public final class UnsafeAccess {
* @return long value at offset
*/
public static long toLong(Object ref, long offset) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
return Long.reverseBytes(theUnsafe.getLong(ref, offset));
}
return theUnsafe.getLong(ref, offset);
@@ -297,7 +297,7 @@ public final class UnsafeAccess {
* @return incremented offset
*/
public static int putInt(ByteBuffer buf, int offset, int val) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
val = Integer.reverseBytes(val);
}
if (buf.isDirect()) {
@@ -402,7 +402,7 @@ public final class UnsafeAccess {
* @return incremented offset
*/
public static int putShort(ByteBuffer buf, int offset, short val) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
val = Short.reverseBytes(val);
}
if (buf.isDirect()) {
@@ -421,7 +421,7 @@ public final class UnsafeAccess {
* @return incremented offset
*/
public static int putLong(ByteBuffer buf, int offset, long val) {
- if (littleEndian) {
+ if (LITTLE_ENDIAN) {
val = Long.reverseBytes(val);
}
if (buf.isDirect()) {
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java
index 706941154d6..6443d84ebd2 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertFalse;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentSkipListMap;
+
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -67,9 +69,23 @@ public class TestByteBufferKeyValue {
assertTrue(CellComparatorImpl.COMPARATOR.compare(cell1, cell3) < 0);
Cell cell4 = getOffheapCell(row1, Bytes.toBytes("f"), qual2);
assertTrue(CellComparatorImpl.COMPARATOR.compare(cell1, cell4) > 0);
+ BBKVComparator comparator = new BBKVComparator(null);
+ assertTrue(comparator.compare(cell1, cell2) < 0);
+ assertTrue(comparator.compare(cell1, cell3) < 0);
+ assertTrue(comparator.compare(cell1, cell4) > 0);
+ ByteBuffer buf = ByteBuffer.allocate(row1.length);
+ ByteBufferUtils.copyFromArrayToBuffer(buf, row1, 0, row1.length);
+
+ ConcurrentSkipListMap map =
+ new ConcurrentSkipListMap<>(comparator);
+ map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1);
+ map.put((ByteBufferKeyValue)cell2, (ByteBufferKeyValue)cell2);
+ map.put((ByteBufferKeyValue)cell3, (ByteBufferKeyValue)cell3);
+ map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1);
+ map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1);
}
- private Cell getOffheapCell(byte [] row, byte [] family, byte [] qualifier) {
+ private static Cell getOffheapCell(byte [] row, byte [] family, byte [] qualifier) {
KeyValue kvCell = new KeyValue(row, family, qualifier, 0L, Type.Put, row);
ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length);
ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length);
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
index 624aa446abb..365082682be 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java
@@ -33,7 +33,9 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -200,8 +202,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup "
+ request.getTargetGroup());
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup());
+ }
checkPermission("moveServers");
groupAdminServer.moveServers(hostPorts, request.getTargetGroup());
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postMoveServers(hostPorts, request.getTargetGroup());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@@ -219,8 +227,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup "
+ request.getTargetGroup());
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup());
+ }
checkPermission("moveTables");
groupAdminServer.moveTables(tables, request.getTargetGroup());
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@@ -233,8 +247,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder();
LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName());
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName());
+ }
checkPermission("addRSGroup");
groupAdminServer.addRSGroup(request.getRSGroupName());
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@@ -248,8 +268,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
RemoveRSGroupResponse.newBuilder();
LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName());
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName());
+ }
checkPermission("removeRSGroup");
groupAdminServer.removeRSGroup(request.getRSGroupName());
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@@ -263,8 +289,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group=" +
request.getRSGroupName());
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName());
+ }
checkPermission("balanceRSGroup");
- builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName()));
+ boolean balancerRan = groupAdminServer.balanceRSGroup(request.getRSGroupName());
+ builder.setBalanceRan(balancerRan);
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(),
+ balancerRan);
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
builder.setBalanceRan(false);
@@ -298,9 +332,9 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
+ hp);
try {
checkPermission("getRSGroupInfoOfServer");
- RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp);
- if (RSGroupInfo != null) {
- builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
+ RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp);
+ if (info != null) {
+ builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(info));
}
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
@@ -323,8 +357,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts
+ " and tables " + tables + " to rsgroup" + request.getTargetGroup());
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables,
+ request.getTargetGroup());
+ }
checkPermission("moveServersAndTables");
groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup());
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables,
+ request.getTargetGroup());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@@ -344,8 +386,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
LOG.info(master.getClientIdAuditPrefix()
+ " remove decommissioned servers from rsgroup: " + servers);
try {
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().preRemoveServers(servers);
+ }
checkPermission("removeServers");
groupAdminServer.removeServers(servers);
+ if (master.getMasterCoprocessorHost() != null) {
+ master.getMasterCoprocessorHost().postRemoveServers(servers);
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
}
@@ -354,12 +402,20 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
}
boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException {
- String groupName =
+ String groupName;
+ try {
+ groupName =
master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString())
.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
- if (groupName == null) {
+ if (groupName == null) {
+ groupName = RSGroupInfo.DEFAULT_GROUP;
+ }
+ } catch (MasterNotRunningException | PleaseHoldException e) {
+ LOG.info("Master has not initialized yet; temporarily using default RSGroup '" +
+ RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
+
RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
if (rsGroupInfo == null) {
throw new ConstraintException(
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
index 670e8aa62ba..b39d3a19a1b 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
@@ -291,9 +291,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName);
- }
// Presume first server's source group. Later ensure all servers are from this group.
Address firstServer = servers.iterator().next();
RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer);
@@ -370,9 +367,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
} while (foundRegionsToMove);
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName);
- }
LOG.info("Move server done: " + srcGrp.getName() + "=>" + targetGroupName);
}
}
@@ -390,9 +384,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup);
- }
if(targetGroup != null) {
RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup);
if(destGroup == null) {
@@ -430,22 +421,12 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
}
}
-
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup);
- }
}
}
@Override
public void addRSGroup(String name) throws IOException {
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preAddRSGroup(name);
- }
rsGroupInfoManager.addRSGroup(new RSGroupInfo(name));
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postAddRSGroup(name);
- }
}
@Override
@@ -453,9 +434,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preRemoveRSGroup(name);
- }
RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name);
if (rsGroupInfo == null) {
throw new ConstraintException("RSGroup " + name + " does not exist");
@@ -480,9 +458,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
}
}
rsGroupInfoManager.removeRSGroup(name);
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postRemoveRSGroup(name);
- }
}
}
@@ -498,9 +473,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
return false;
}
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preBalanceRSGroup(groupName);
- }
if (getRSGroupInfo(groupName) == null) {
throw new ConstraintException("RSGroup does not exist: "+groupName);
}
@@ -542,9 +514,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
LOG.info("RSGroup balance " + groupName + " completed after " +
(System.currentTimeMillis()-startTime) + " seconds");
}
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan);
- }
return balancerRan;
}
}
@@ -575,9 +544,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers and tables to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, targetGroup);
- }
//check servers and tables status
checkServersAndTables(servers, tables, targetGroup);
@@ -589,10 +555,6 @@ public class RSGroupAdminServer implements RSGroupAdmin {
moveRegionsFromServers(servers, tables, targetGroup);
//move regions which should belong to these servers
moveRegionsToServers(servers, tables, targetGroup);
-
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup);
- }
}
LOG.info("Move servers and tables done. Severs :"
+ servers + " , Tables : " + tables + " => " + targetGroup);
@@ -607,15 +569,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
// Hold a lock on the manager instance while moving servers to prevent
// another writer changing our state while we are working.
synchronized (rsGroupInfoManager) {
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().preRemoveServers(servers);
- }
//check the set of servers
checkForDeadOrOnlineServers(servers);
rsGroupInfoManager.removeServers(servers);
- if (master.getMasterCoprocessorHost() != null) {
- master.getMasterCoprocessorHost().postRemoveServers(servers);
- }
LOG.info("Remove decommissioned servers " + servers + " from rsgroup done.");
}
}
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
index 9eb048fbf52..69131f98548 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java
@@ -200,6 +200,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
for (RegionInfo region : regions.keySet()) {
if (!misplacedRegions.contains(region)) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
+ if (groupName == null) {
+ LOG.info("Group not found for table " + region.getTable() + ", using default");
+ groupName = RSGroupInfo.DEFAULT_GROUP;
+ }
groupToRegion.put(groupName, region);
}
}
@@ -221,6 +225,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
for (RegionInfo region : misplacedRegions) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
+ if (groupName == null) {
+ LOG.info("Group not found for table " + region.getTable() + ", using default");
+ groupName = RSGroupInfo.DEFAULT_GROUP;
+ }
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
List candidateList = filterOfflineServers(info, servers);
ServerName server = this.internalBalancer.randomAssignment(region,
@@ -263,7 +271,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
for (RegionInfo region : regions) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
- LOG.warn("Group for table "+region.getTable()+" is null");
+ LOG.info("Group not found for table " + region.getTable() + ", using default");
+ groupName = RSGroupInfo.DEFAULT_GROUP;
}
regionMap.put(groupName, region);
}
@@ -322,8 +331,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
for(Map.Entry region : regions.entrySet()) {
RegionInfo regionInfo = region.getKey();
ServerName assignedServer = region.getValue();
- RSGroupInfo info = rsGroupInfoManager.getRSGroup(rsGroupInfoManager.
- getRSGroupOfTable(regionInfo.getTable()));
+ String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable());
+ if (groupName == null) {
+ LOG.info("Group not found for table " + regionInfo.getTable() + ", using default");
+ groupName = RSGroupInfo.DEFAULT_GROUP;
+ }
+ RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
if (assignedServer == null) {
LOG.debug("There is no assigned server for {}", region);
continue;
@@ -358,8 +371,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
for (RegionInfo region : regions) {
RSGroupInfo targetRSGInfo = null;
try {
- targetRSGInfo = rsGroupInfoManager.getRSGroup(
- rsGroupInfoManager.getRSGroupOfTable(region.getTable()));
+ String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
+ if (groupName == null) {
+ LOG.info("Group not found for table " + region.getTable() + ", using default");
+ groupName = RSGroupInfo.DEFAULT_GROUP;
+ }
+ targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName);
} catch (IOException exp) {
LOG.debug("RSGroup information null for region of table " + region.getTable(),
exp);
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 4ae92a615c3..6fe7e39079b 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -23,7 +23,6 @@ import com.google.protobuf.ServiceException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -72,10 +71,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
-import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
-import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -249,7 +245,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
public synchronized void moveTables(Set tableNames, String groupName)
throws IOException {
if (groupName != null && !rsGroupMap.containsKey(groupName)) {
- throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
+ throw new DoNotRetryIOException("Group "+groupName+" does not exist");
}
Map newGroupMap = Maps.newHashMap(rsGroupMap);
@@ -408,18 +404,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
orphanTables.add(TableName.valueOf(entry));
}
-
- List specialTables =
- new ArrayList(Arrays.asList(AccessControlLists.ACL_TABLE_NAME,
- TableName.META_TABLE_NAME, TableName.NAMESPACE_TABLE_NAME, RSGROUP_TABLE_NAME));
- // if quota is enabled, add corresponding system table to special tables list
- if (QuotaUtil.isQuotaEnabled(conn.getConfiguration())) {
- specialTables.add(QuotaTableUtil.QUOTA_TABLE_NAME);
- }
-
- for (TableName table : specialTables) {
- orphanTables.add(table);
- }
for (RSGroupInfo group: groupList) {
if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
orphanTables.removeAll(group.getTables());
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
index 521b8b91a4c..3e74f819cd3 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -36,6 +39,11 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.net.Address;
@@ -69,7 +77,7 @@ public class TestRSGroups extends TestRSGroupsBase {
protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroups.class);
private static boolean INIT = false;
private static RSGroupAdminEndpoint rsGroupAdminEndpoint;
-
+ private static CPMasterObserver observer;
@BeforeClass
public static void setUp() throws Exception {
@@ -80,7 +88,7 @@ public class TestRSGroups extends TestRSGroupsBase {
HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
RSGroupBasedLoadBalancer.class.getName());
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
- RSGroupAdminEndpoint.class.getName());
+ RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
// Enable quota for testRSGroupsWithHBaseQuota()
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
@@ -104,8 +112,10 @@ public class TestRSGroups extends TestRSGroupsBase {
admin.setBalancerRunning(false,true);
rsGroupAdmin = new VerifyingRSGroupAdminClient(
new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration());
+ MasterCoprocessorHost host = master.getMasterCoprocessorHost();
+ observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName());
rsGroupAdminEndpoint = (RSGroupAdminEndpoint)
- master.getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName());
+ host.findCoprocessor(RSGroupAdminEndpoint.class.getName());
}
@AfterClass
@@ -148,6 +158,7 @@ public class TestRSGroups extends TestRSGroupsBase {
} catch (Exception ex) {
LOG.warn("Got this on setup, FYI", ex);
}
+ assertTrue(observer.preMoveServersCalled);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() {
@Override
public boolean evaluate() throws Exception {
@@ -221,6 +232,9 @@ public class TestRSGroups extends TestRSGroupsBase {
String groupName = tablePrefix+"_foo";
LOG.info("testNamespaceConstraint");
rsGroupAdmin.addRSGroup(groupName);
+ assertTrue(observer.preAddRSGroupCalled);
+ assertTrue(observer.postAddRSGroupCalled);
+
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
.build());
@@ -241,6 +255,8 @@ public class TestRSGroups extends TestRSGroupsBase {
//test add non-existent group
admin.deleteNamespace(nsName);
rsGroupAdmin.removeRSGroup(groupName);
+ assertTrue(observer.preRemoveRSGroupCalled);
+ assertTrue(observer.postRemoveRSGroupCalled);
try {
admin.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo")
@@ -261,6 +277,120 @@ public class TestRSGroups extends TestRSGroupsBase {
it.next();
}
+ public static class CPMasterObserver implements MasterCoprocessor, MasterObserver {
+ boolean preBalanceRSGroupCalled = false;
+ boolean postBalanceRSGroupCalled = false;
+ boolean preMoveServersCalled = false;
+ boolean postMoveServersCalled = false;
+ boolean preMoveTablesCalled = false;
+ boolean postMoveTablesCalled = false;
+ boolean preAddRSGroupCalled = false;
+ boolean postAddRSGroupCalled = false;
+ boolean preRemoveRSGroupCalled = false;
+ boolean postRemoveRSGroupCalled = false;
+ boolean preRemoveServersCalled = false;
+ boolean postRemoveServersCalled = false;
+ boolean preMoveServersAndTables = false;
+ boolean postMoveServersAndTables = false;
+
+ @Override
+ public Optional getMasterObserver() {
+ return Optional.of(this);
+ }
+ @Override
+ public void preMoveServersAndTables(final ObserverContext ctx,
+ Set servers, Set tables, String targetGroup) throws IOException {
+ preMoveServersAndTables = true;
+ }
+ @Override
+ public void postMoveServersAndTables(final ObserverContext ctx,
+ Set servers, Set tables, String targetGroup) throws IOException {
+ postMoveServersAndTables = true;
+ }
+ @Override
+ public void preRemoveServers(
+ final ObserverContext ctx,
+ Set servers) throws IOException {
+ preRemoveServersCalled = true;
+ }
+ @Override
+ public void postRemoveServers(
+ final ObserverContext ctx,
+ Set servers) throws IOException {
+ postRemoveServersCalled = true;
+ }
+ @Override
+ public void preRemoveRSGroup(final ObserverContext ctx,
+ String name) throws IOException {
+ preRemoveRSGroupCalled = true;
+ }
+ @Override
+ public void postRemoveRSGroup(final ObserverContext ctx,
+ String name) throws IOException {
+ postRemoveRSGroupCalled = true;
+ }
+ @Override
+ public void preAddRSGroup(final ObserverContext ctx,
+ String name) throws IOException {
+ preAddRSGroupCalled = true;
+ }
+ @Override
+ public void postAddRSGroup(final ObserverContext ctx,
+ String name) throws IOException {
+ postAddRSGroupCalled = true;
+ }
+ @Override
+ public void preMoveTables(final ObserverContext ctx,
+ Set tables, String targetGroup) throws IOException {
+ preMoveTablesCalled = true;
+ }
+ @Override
+ public void postMoveTables(final ObserverContext ctx,
+ Set tables, String targetGroup) throws IOException {
+ postMoveTablesCalled = true;
+ }
+ @Override
+ public void preMoveServers(final ObserverContext ctx,
+ Set servers, String targetGroup) throws IOException {
+ preMoveServersCalled = true;
+ }
+
+ @Override
+ public void postMoveServers(final ObserverContext ctx,
+ Set servers, String targetGroup) throws IOException {
+ postMoveServersCalled = true;
+ }
+ @Override
+ public void preBalanceRSGroup(final ObserverContext ctx,
+ String groupName) throws IOException {
+ preBalanceRSGroupCalled = true;
+ }
+ @Override
+ public void postBalanceRSGroup(final ObserverContext ctx,
+ String groupName, boolean balancerRan) throws IOException {
+ postBalanceRSGroupCalled = true;
+ }
+ }
+ @Test
+ public void testMoveServersAndTables() throws Exception {
+ super.testMoveServersAndTables();
+ assertTrue(observer.preMoveServersAndTables);
+ assertTrue(observer.postMoveServersAndTables);
+ }
+ @Test
+ public void testTableMoveTruncateAndDrop() throws Exception {
+ super.testTableMoveTruncateAndDrop();
+ assertTrue(observer.preMoveTablesCalled);
+ assertTrue(observer.postMoveTablesCalled);
+ }
+
+ @Test
+ public void testRemoveServers() throws Exception {
+ super.testRemoveServers();
+ assertTrue(observer.preRemoveServersCalled);
+ assertTrue(observer.postRemoveServersCalled);
+ }
+
@Test
public void testMisplacedRegions() throws Exception {
final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
@@ -277,6 +407,8 @@ public class TestRSGroups extends TestRSGroupsBase {
admin.setBalancerRunning(true,true);
assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName()));
admin.setBalancerRunning(false,true);
+ assertTrue(observer.preBalanceRSGroupCalled);
+ assertTrue(observer.postBalanceRSGroupCalled);
TEST_UTIL.waitFor(60000, new Predicate() {
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 4c9f6f6d659..5bcaa172d1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -332,8 +332,8 @@ public class HFile {
try {
ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
} catch (UnsupportedOperationException uoe) {
- if (LOG.isTraceEnabled()) LOG.trace("Unable to set drop behind on " + path, uoe);
- else if (LOG.isDebugEnabled()) LOG.debug("Unable to set drop behind on " + path);
+ LOG.trace("Unable to set drop behind on {}", path, uoe);
+ LOG.debug("Unable to set drop behind on {}", path.getName());
}
}
return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, fileContext);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
index a4fe883668c..94a256d9341 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java
@@ -53,7 +53,7 @@ public class CellSet implements NavigableSet {
private final int numUniqueKeys;
CellSet(final CellComparator c) {
- this.delegatee = new ConcurrentSkipListMap<>(c);
+ this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
this.numUniqueKeys = UNKNOWN_NUM_UNIQUES;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index 5577be47ad2..44ba65f6208 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -46,7 +46,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
@InterfaceAudience.Private
public class ChunkCreator {
private static final Logger LOG = LoggerFactory.getLogger(ChunkCreator.class);
- // monotonically increasing chunkid
+ // monotonically increasing chunkid. Starts at 1.
private AtomicInteger chunkID = new AtomicInteger(1);
// maps the chunk against the monotonically increasing chunk id. We need to preserve the
// natural ordering of the key
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 49263984057..c0203a49052 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2552,8 +2552,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," +
- " memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) +
- " memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) +
+ " dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
+ " heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
}
@@ -2753,10 +2753,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long time = EnvironmentEdgeManager.currentTime() - startTime;
MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize();
long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize();
- String msg = "Finished memstore flush;"
- + " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
- + ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
- + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ String msg = "Finished flush of"
+ + " dataSize ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize()
+ + ", heapSize ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize()
+ + ", currentSize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid="
+ flushOpSeqId + ", compaction requested=" + compactionRequested
+ ((wal == null) ? "; wal=null" : "");
@@ -4236,7 +4236,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param cellItr
* @param now
*/
- public void updateCellTimestamps(final Iterable> cellItr, final byte[] now)
+ private static void updateCellTimestamps(final Iterable> cellItr, final byte[] now)
throws IOException {
for (List cells: cellItr) {
if (cells == null) continue;
@@ -4291,12 +4291,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
requestFlush();
// Don't print current limit because it will vary too much. The message is used as a key
// over in RetriesExhaustedWithDetailsException processing.
- throw new RegionTooBusyException("Over memstore limit; regionName=" +
+ throw new RegionTooBusyException("Over memstore limit=" +
+ org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) +
+ ", regionName=" +
(this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) +
- ", server=" + (this.getRegionServerServices() == null ? "unknown":
- this.getRegionServerServices().getServerName()) +
- ", blockingMemStoreSize=" +
- org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(blockingMemStoreSize));
+ ", server=" + (this.getRegionServerServices() == null? "unknown":
+ this.getRegionServerServices().getServerName()));
}
}
@@ -8602,4 +8602,5 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public void requestFlush(FlushLifeCycleTracker tracker) throws IOException {
requestFlush0(tracker);
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 0db0fd9c285..ac7223f2d8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -120,12 +120,13 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
@Override
public Cell forceCopyOfBigCellInto(Cell cell) {
- int size = KeyValueUtil.length(cell) + ChunkCreator.SIZEOF_CHUNK_HEADER;
+ int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
+ KeyValueUtil.length(cell);
+ size += ChunkCreator.SIZEOF_CHUNK_HEADER;
Preconditions.checkArgument(size >= 0, "negative size");
if (size <= dataChunkSize) {
// Using copyCellInto for cells which are bigger than the original maxAlloc
- Cell newCell = copyCellInto(cell, dataChunkSize);
- return newCell;
+ return copyCellInto(cell, dataChunkSize);
} else {
Chunk c = getNewExternalChunk(size);
int allocOffset = c.alloc(size);
@@ -134,7 +135,8 @@ public class MemStoreLABImpl implements MemStoreLAB {
}
private Cell copyCellInto(Cell cell, int maxAlloc) {
- int size = KeyValueUtil.length(cell);
+ int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize():
+ KeyValueUtil.length(cell);
Preconditions.checkArgument(size >= 0, "negative size");
// Callers should satisfy large allocations directly from JVM since they
// don't cause fragmentation as badly.
@@ -169,7 +171,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
* Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
* out of it
*/
- private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
+ private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
int tagsLen = cell.getTagsLength();
if (cell instanceof ExtendedCell) {
((ExtendedCell) cell).write(buf, offset);
@@ -255,8 +257,7 @@ public class MemStoreLABImpl implements MemStoreLAB {
*/
private Chunk getOrMakeChunk() {
// Try to get the chunk
- Chunk c;
- c = currChunk.get();
+ Chunk c = currChunk.get();
if (c != null) {
return c;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index bd5c529092d..8286f7db742 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -43,21 +43,22 @@ import org.slf4j.LoggerFactory;
* target cluster is an HBase cluster.
*/
@InterfaceAudience.Private
-@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
- justification="Thinks zkw needs to be synchronized access but should be fine as is.")
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
implements Abortable {
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
- private ZKWatcher zkw = null; // FindBugs: MT_CORRECTNESS
+ private Object zkwLock = new Object();
+ private ZKWatcher zkw = null;
private List regionServers = new ArrayList<>(0);
private long lastRegionServerUpdate;
protected void disconnect() {
- if (zkw != null) {
- zkw.close();
+ synchronized (zkwLock) {
+ if (zkw != null) {
+ zkw.close();
+ }
}
}
@@ -112,7 +113,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
public synchronized UUID getPeerUUID() {
UUID peerUUID = null;
try {
- peerUUID = ZKClusterId.getUUIDForCluster(zkw);
+ synchronized (zkwLock) {
+ peerUUID = ZKClusterId.getUUIDForCluster(zkw);
+ }
} catch (KeeperException ke) {
reconnect(ke);
}
@@ -124,7 +127,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* @return zk connection
*/
protected ZKWatcher getZkw() {
- return zkw;
+ synchronized (zkwLock) {
+ return zkw;
+ }
}
/**
@@ -132,10 +137,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* @throws IOException If anything goes wrong connecting
*/
void reloadZkWatcher() throws IOException {
- if (zkw != null) zkw.close();
- zkw = new ZKWatcher(ctx.getConfiguration(),
+ synchronized (zkwLock) {
+ if (zkw != null) {
+ zkw.close();
+ }
+ zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
- getZkw().registerListener(new PeerRegionServerListener(this));
+ zkw.registerListener(new PeerRegionServerListener(this));
+ }
}
@Override
@@ -173,13 +182,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
* for this peer cluster
* @return list of addresses
*/
- // Synchronize peer cluster connection attempts to avoid races and rate
- // limit connections when multiple replication sources try to connect to
- // the peer cluster. If the peer cluster is down we can get out of control
- // over time.
- public synchronized List getRegionServers() {
+ public List getRegionServers() {
try {
- setRegionServers(fetchSlavesAddresses(this.getZkw()));
+ // Synchronize peer cluster connection attempts to avoid races and rate
+ // limit connections when multiple replication sources try to connect to
+ // the peer cluster. If the peer cluster is down we can get out of control
+ // over time.
+ synchronized (zkwLock) {
+ setRegionServers(fetchSlavesAddresses(zkw));
+ }
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 7727b108c67..24604d988e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -47,9 +47,11 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* implementations:
*
* - defaultProvider : whatever provider is standard for the hbase version. Currently
- * "filesystem"
+ * "asyncfs"
+ * - asyncfs : a provider that will run on top of an implementation of the Hadoop
+ * FileSystem interface via an asynchronous client.
* - filesystem : a provider that will run on top of an implementation of the Hadoop
- * FileSystem interface, normally HDFS.
+ * FileSystem interface via HDFS's synchronous DFSClient.
* - multiwal : a provider that will use multiple "filesystem" wal instances per region
* server.
*
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java
index dc47661166d..d852d2db2ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java
@@ -47,9 +47,10 @@ public class TestTagRewriteCell {
// VisibilityController and AccessController)
Cell trCell2 = PrivateCellUtil.createCell(trCell, new byte[fakeTagArrayLength]);
- assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be larger than a " +
- "single TagRewriteCell's heapsize", trCellHeapSize < ((HeapSize)trCell2).heapSize());
- assertTrue("TagRewriteCell should have had nulled out tags array", ((HeapSize)trCell).heapSize() <
- trCellHeapSize);
+ assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be " +
+ "larger than a single TagRewriteCell's heapsize",
+ trCellHeapSize < ((HeapSize)trCell2).heapSize());
+ assertTrue("TagRewriteCell should have had nulled out tags array",
+ ((HeapSize)trCell).heapSize() < trCellHeapSize);
}
}
| | |