From 304d3e6fa9f44cb25ab03cb6d949f1c9b74779d1 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 21 May 2018 13:53:01 -0700 Subject: [PATCH 1/4] HBASE-20595 Remove the concept of 'special tables' from rsgroups --- .../hbase/rsgroup/RSGroupAdminEndpoint.java | 20 ++++++++++---- .../rsgroup/RSGroupBasedLoadBalancer.java | 27 +++++++++++++++---- .../hbase/rsgroup/RSGroupInfoManagerImpl.java | 18 +------------ 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 624aa446abb..6f3c1d1f15f 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -33,7 +33,9 @@ import java.util.stream.Collectors; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -298,9 +300,9 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { + hp); try { checkPermission("getRSGroupInfoOfServer"); - RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupOfServer(hp); - if (RSGroupInfo != null) { - builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo)); + RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp); + if (info != null) { + builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(info)); } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); @@ -354,12 +356,20 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { } boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException { - String groupName = + String groupName; + try { + groupName = master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); - if (groupName == null) { + if (groupName == null) { + groupName = RSGroupInfo.DEFAULT_GROUP; + } + } catch (MasterNotRunningException | PleaseHoldException e) { + LOG.info("Master has not initialized yet; temporarily using default RSGroup '" + + RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table"); groupName = RSGroupInfo.DEFAULT_GROUP; } + RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); if (rsGroupInfo == null) { throw new ConstraintException( diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index 9eb048fbf52..69131f98548 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -200,6 +200,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for (RegionInfo region : regions.keySet()) { if (!misplacedRegions.contains(region)) { String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.info("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } groupToRegion.put(groupName, region); } } @@ -221,6 +225,10 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for (RegionInfo region : misplacedRegions) { String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.info("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName); List candidateList = filterOfflineServers(info, servers); ServerName server = this.internalBalancer.randomAssignment(region, @@ -263,7 +271,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for (RegionInfo region : regions) { String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); if (groupName == null) { - LOG.warn("Group for table "+region.getTable()+" is null"); + LOG.info("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; } regionMap.put(groupName, region); } @@ -322,8 +331,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for(Map.Entry region : regions.entrySet()) { RegionInfo regionInfo = region.getKey(); ServerName assignedServer = region.getValue(); - RSGroupInfo info = rsGroupInfoManager.getRSGroup(rsGroupInfoManager. - getRSGroupOfTable(regionInfo.getTable())); + String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable()); + if (groupName == null) { + LOG.info("Group not found for table " + regionInfo.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } + RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName); if (assignedServer == null) { LOG.debug("There is no assigned server for {}", region); continue; @@ -358,8 +371,12 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { for (RegionInfo region : regions) { RSGroupInfo targetRSGInfo = null; try { - targetRSGInfo = rsGroupInfoManager.getRSGroup( - rsGroupInfoManager.getRSGroupOfTable(region.getTable())); + String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable()); + if (groupName == null) { + LOG.info("Group not found for table " + region.getTable() + ", using default"); + groupName = RSGroupInfo.DEFAULT_GROUP; + } + targetRSGInfo = rsGroupInfoManager.getRSGroup(groupName); } catch (IOException exp) { LOG.debug("RSGroup information null for region of table " + region.getTable(), exp); diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index 4ae92a615c3..6fe7e39079b 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -23,7 +23,6 @@ import com.google.protobuf.ServiceException; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -72,10 +71,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; -import org.apache.hadoop.hbase.quotas.QuotaTableUtil; -import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; -import org.apache.hadoop.hbase.security.access.AccessControlLists; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -249,7 +245,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { public synchronized void moveTables(Set tableNames, String groupName) throws IOException { if (groupName != null && !rsGroupMap.containsKey(groupName)) { - throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group"); + throw new DoNotRetryIOException("Group "+groupName+" does not exist"); } Map newGroupMap = Maps.newHashMap(rsGroupMap); @@ -408,18 +404,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { for(String entry: masterServices.getTableDescriptors().getAll().keySet()) { orphanTables.add(TableName.valueOf(entry)); } - - List specialTables = - new ArrayList(Arrays.asList(AccessControlLists.ACL_TABLE_NAME, - TableName.META_TABLE_NAME, TableName.NAMESPACE_TABLE_NAME, RSGROUP_TABLE_NAME)); - // if quota is enabled, add corresponding system table to special tables list - if (QuotaUtil.isQuotaEnabled(conn.getConfiguration())) { - specialTables.add(QuotaTableUtil.QUOTA_TABLE_NAME); - } - - for (TableName table : specialTables) { - orphanTables.add(table); - } for (RSGroupInfo group: groupList) { if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { orphanTables.removeAll(group.getTables()); From 079f168c5c17ddef3a5062e7b75acfde9b73c7e6 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 22 May 2018 13:16:11 -0700 Subject: [PATCH 2/4] HBASE-20620 HBASE-20564 Tighter ByteBufferKeyValue Cell Comparator; part 2 Adds new stripped-down, faster ByteBufferKeyValue comparator (BBKV is the base Cell-type in hbase2). Creates an instance of new Comparator each time we create new memstore rather than use the universal CellComparator. Remove unused and unneeded Interfaces from Cell base type. --- .../apache/hadoop/hbase/BBKVComparator.java | 173 ++++++++++++++++++ .../apache/hadoop/hbase/CellComparator.java | 8 + .../hadoop/hbase/CellComparatorImpl.java | 123 +++---------- .../org/apache/hadoop/hbase/ExtendedCell.java | 14 +- .../hbase/IndividualBytesFieldCell.java | 2 +- .../org/apache/hadoop/hbase/KeyValue.java | 2 +- .../hadoop/hbase/util/ByteBufferUtils.java | 14 +- .../org/apache/hadoop/hbase/util/Bytes.java | 2 +- .../hadoop/hbase/util/UnsafeAccess.java | 32 ++-- .../hadoop/hbase/TestByteBufferKeyValue.java | 18 +- .../apache/hadoop/hbase/io/hfile/HFile.java | 4 +- .../hadoop/hbase/regionserver/CellSet.java | 2 +- .../hbase/regionserver/ChunkCreator.java | 2 +- .../hadoop/hbase/regionserver/HRegion.java | 25 +-- .../hbase/regionserver/MemStoreLABImpl.java | 15 +- .../apache/hadoop/hbase/wal/WALFactory.java | 6 +- .../hadoop/hbase/TestTagRewriteCell.java | 9 +- 17 files changed, 300 insertions(+), 151 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java new file mode 100644 index 00000000000..c15d32110ad --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/BBKVComparator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.Comparator; + +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hbase.thirdparty.com.google.common.primitives.Longs; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A comparator for case where {@link ByteBufferKeyValue} is prevalent type (BBKV + * is base-type in hbase2). Takes a general comparator as fallback in case types are NOT the + * expected ByteBufferKeyValue. + * + *

This is a tricked-out Comparator at heart of hbase read and write. It is in + * the HOT path so we try all sorts of ugly stuff so we can go faster. See below + * in this javadoc comment for the list. + * + *

Apply this comparator narrowly so it is fed exclusively ByteBufferKeyValues + * as much as is possible so JIT can settle (e.g. make one per ConcurrentSkipListMap + * in HStore). + * + *

Exploits specially added methods in BBKV to save on deserializations of shorts, + * longs, etc: i.e. calculating the family length requires row length; pass it in + * rather than recalculate it, and so on. + * + *

This comparator does static dispatch to private final methods so hotspot is comfortable + * deciding inline. + * + *

Measurement has it that we almost have it so all inlines from memstore + * ConcurrentSkipListMap on down to the (unsafe) intrinisics that do byte compare + * and deserialize shorts and ints; needs a bit more work. + * + *

Does not take a Type to compare: i.e. it is not a Comparator<Cell> or + * CellComparator<Cell> or Comparator<ByteBufferKeyValue> because that adds + * another method to the hierarchy -- from compare(Object, Object) + * to dynamic compare(Cell, Cell) to static private compare -- and inlining doesn't happen if + * hierarchy is too deep (it is the case here). + * + *

Be careful making changes. Compare perf before and after and look at what + * hotspot ends up generating before committing change (jitwatch is helpful here). + * Changing this one class doubled write throughput (HBASE-20483). + */ +@InterfaceAudience.Private +public class BBKVComparator implements Comparator { + protected static final Logger LOG = LoggerFactory.getLogger(BBKVComparator.class); + private final Comparator fallback; + + public BBKVComparator(Comparator fallback) { + this.fallback = fallback; + } + + @Override + public int compare(Object l, Object r) { + // LOG.info("ltype={} rtype={}", l, r); + if ((l instanceof ByteBufferKeyValue) && (r instanceof ByteBufferKeyValue)) { + return compare((ByteBufferKeyValue)l, (ByteBufferKeyValue)r); + } + // Skip calling compare(Object, Object) and go direct to compare(Cell, Cell) + return this.fallback.compare((Cell)l, (Cell)r); + } + + // TODO: Come back here. We get a few percentage points extra of throughput if this is a + // private method. + static final int compare(ByteBufferKeyValue left, ByteBufferKeyValue right) { + // NOTE: Same method is in CellComparatorImpl, also private, not shared, intentionally. Not + // sharing gets us a few percent more throughput in compares. If changes here or there, make + // sure done in both places. + + // Compare Rows. Cache row length. + int leftRowLength = left.getRowLength(); + int rightRowLength = right.getRowLength(); + int diff = ByteBufferUtils.compareTo(left.getRowByteBuffer(), left.getRowPosition(), + leftRowLength, + right.getRowByteBuffer(), right.getRowPosition(), rightRowLength); + if (diff != 0) { + return diff; + } + + // If the column is not specified, the "minimum" key type appears as latest in the sorted + // order, regardless of the timestamp. This is used for specifying the last key/value in a + // given row, because there is no "lexicographically last column" (it would be infinitely long). + // The "maximum" key type does not need this behavior. Copied from KeyValue. This is bad in that + // we can't do memcmp w/ special rules like this. + // TODO: Is there a test for this behavior? + int leftFamilyLengthPosition = left.getFamilyLengthPosition(leftRowLength); + int leftFamilyLength = left.getFamilyLength(leftFamilyLengthPosition); + int leftKeyLength = left.getKeyLength(); + int leftQualifierLength = left.getQualifierLength(leftKeyLength, leftRowLength, + leftFamilyLength); + + // No need of left row length below here. + + byte leftType = left.getTypeByte(leftKeyLength); + if (leftFamilyLength + leftQualifierLength == 0 && + leftType == KeyValue.Type.Minimum.getCode()) { + // left is "bigger", i.e. it appears later in the sorted order + return 1; + } + + int rightFamilyLengthPosition = right.getFamilyLengthPosition(rightRowLength); + int rightFamilyLength = right.getFamilyLength(rightFamilyLengthPosition); + int rightKeyLength = right.getKeyLength(); + int rightQualifierLength = right.getQualifierLength(rightKeyLength, rightRowLength, + rightFamilyLength); + + // No need of right row length below here. + + byte rightType = right.getTypeByte(rightKeyLength); + if (rightFamilyLength + rightQualifierLength == 0 && + rightType == KeyValue.Type.Minimum.getCode()) { + return -1; + } + + // Compare families. + int leftFamilyPosition = left.getFamilyPosition(leftFamilyLengthPosition); + int rightFamilyPosition = right.getFamilyPosition(rightFamilyLengthPosition); + diff = ByteBufferUtils.compareTo(left.getFamilyByteBuffer(), leftFamilyPosition, + leftFamilyLength, + right.getFamilyByteBuffer(), rightFamilyPosition, rightFamilyLength); + if (diff != 0) { + return diff; + } + + // Compare qualifiers + diff = ByteBufferUtils.compareTo(left.getQualifierByteBuffer(), + left.getQualifierPosition(leftFamilyPosition, leftFamilyLength), leftQualifierLength, + right.getQualifierByteBuffer(), + right.getQualifierPosition(rightFamilyPosition, rightFamilyLength), + rightQualifierLength); + if (diff != 0) { + return diff; + } + + // Timestamps. + // Swap order we pass into compare so we get DESCENDING order. + diff = Long.compare(right.getTimestamp(rightKeyLength), left.getTimestamp(leftKeyLength)); + if (diff != 0) { + return diff; + } + + // Compare types. Let the delete types sort ahead of puts; i.e. types + // of higher numbers sort before those of lesser numbers. Maximum (255) + // appears ahead of everything, and minimum (0) appears after + // everything. + diff = (0xff & rightType) - (0xff & leftType); + if (diff != 0) { + return diff; + } + + // Negate following comparisons so later edits show up first mvccVersion: later sorts first + return Longs.compare(right.getSequenceId(), left.getSequenceId()); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index 60be67082bc..3529d54f7df 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -21,6 +21,7 @@ import java.util.Comparator; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; + /** * Comparator for comparing cells and has some specialized methods that allows comparing individual * cell components like row, family, qualifier and timestamp @@ -130,4 +131,11 @@ public interface CellComparator extends Comparator { * timestamp 0 if both timestamps are equal */ int compareTimestamps(long leftCellts, long rightCellts); + + /** + * @return A dumbed-down, fast comparator for hbase2 base-type, the {@link ByteBufferKeyValue}. + * Create an instance when you make a new memstore, when you know only BBKVs will be passed. + * Do not pollute with types other than BBKV if can be helped; the Comparator will slow. + */ + Comparator getSimpleComparator(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java index 785d8ffb1c3..c2f2ea55d52 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java @@ -18,15 +18,18 @@ package org.apache.hadoop.hbase; +import java.util.Comparator; + import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.primitives.Longs; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.primitives.Longs; + /** * Compare two HBase cells. Do not use this method comparing -ROOT- or @@ -34,9 +37,13 @@ import org.apache.hbase.thirdparty.com.google.common.primitives.Longs; * takes account of the special formatting of the row where we have commas to delimit table from * regionname, from row. See KeyValue for how it has a special comparator to do hbase:meta cells * and yet another for -ROOT-. - * While using this comparator for {{@link #compareRows(Cell, Cell)} et al, the hbase:meta cells + *

While using this comparator for {{@link #compareRows(Cell, Cell)} et al, the hbase:meta cells * format should be taken into consideration, for which the instance of this comparator * should be used. In all other cases the static APIs in this comparator would be enough + *

HOT methods. We spend a good portion of CPU comparing. Anything that makes the compare + * faster will likely manifest at the macro level. See also + * {@link BBKVComparator}. Use it when mostly {@link ByteBufferKeyValue}s. + *

*/ @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="UNKNOWN", @@ -57,21 +64,17 @@ public class CellComparatorImpl implements CellComparator { public static final CellComparatorImpl META_COMPARATOR = new MetaCellComparator(); @Override - public int compare(Cell a, Cell b) { + public final int compare(final Cell a, final Cell b) { return compare(a, b, false); } - /** - * Compare cells. - * @param ignoreSequenceid True if we are to compare the key portion only and ignore - * the sequenceid. Set to false to compare key and consider sequenceid. - * @return 0 if equal, -1 if a < b, and +1 if a > b. - */ @Override public int compare(final Cell a, final Cell b, boolean ignoreSequenceid) { + int diff = 0; + // "Peel off" the most common path. if (a instanceof ByteBufferKeyValue && b instanceof ByteBufferKeyValue) { - diff = compareByteBufferKeyValue((ByteBufferKeyValue)a, (ByteBufferKeyValue)b); + diff = BBKVComparator.compare((ByteBufferKeyValue)a, (ByteBufferKeyValue)b); if (diff != 0) { return diff; } @@ -88,82 +91,7 @@ public class CellComparatorImpl implements CellComparator { } // Negate following comparisons so later edits show up first mvccVersion: later sorts first - return ignoreSequenceid? diff: Longs.compare(b.getSequenceId(), a.getSequenceId()); - } - - /** - * Specialized comparator for the ByteBufferKeyValue type exclusivesly. - * Caches deserialized lengths of rows and families, etc., and reuses them where it can - * (ByteBufferKeyValue has been changed to be amenable to our providing pre-made lengths, etc.) - */ - private static final int compareByteBufferKeyValue(ByteBufferKeyValue left, - ByteBufferKeyValue right) { - // Compare Rows. Cache row length. - int leftRowLength = left.getRowLength(); - int rightRowLength = right.getRowLength(); - int diff = ByteBufferUtils.compareTo( - left.getRowByteBuffer(), left.getRowPosition(), leftRowLength, - right.getRowByteBuffer(), right.getRowPosition(), rightRowLength); - if (diff != 0) { - return diff; - } - - // If the column is not specified, the "minimum" key type appears the - // latest in the sorted order, regardless of the timestamp. This is used - // for specifying the last key/value in a given row, because there is no - // "lexicographically last column" (it would be infinitely long). The - // "maximum" key type does not need this behavior. - // Copied from KeyValue. This is bad in that we can't do memcmp w/ special rules like this. - // I tried to get rid of the above but scanners depend on it. TODO. - int leftFamilyLengthPosition = left.getFamilyLengthPosition(leftRowLength); - int leftFamilyLength = left.getFamilyLength(leftFamilyLengthPosition); - int rightFamilyLengthPosition = right.getFamilyLengthPosition(rightRowLength); - int rightFamilyLength = right.getFamilyLength(rightFamilyLengthPosition); - int leftKeyLength = left.getKeyLength(); - int leftQualifierLength = left.getQualifierLength(leftKeyLength, leftRowLength, - leftFamilyLength); - byte leftType = left.getTypeByte(leftKeyLength); - if (leftFamilyLength + leftQualifierLength == 0 && leftType == Type.Minimum.getCode()) { - // left is "bigger", i.e. it appears later in the sorted order - return 1; - } - int rightKeyLength = right.getKeyLength(); - int rightQualifierLength = right.getQualifierLength(rightKeyLength, rightRowLength, - rightFamilyLength); - byte rightType = right.getTypeByte(rightKeyLength); - if (rightFamilyLength + rightQualifierLength == 0 && rightType == Type.Minimum.getCode()) { - return -1; - } - - // Compare families. - int leftFamilyPosition = left.getFamilyPosition(leftFamilyLengthPosition); - int rightFamilyPosition = right.getFamilyPosition(rightFamilyLengthPosition); - diff = ByteBufferUtils.compareTo( - left.getFamilyByteBuffer(), leftFamilyPosition, leftFamilyLength, - right.getFamilyByteBuffer(), rightFamilyPosition, rightFamilyLength); - if (diff != 0) { - return diff; - } - // Compare qualifiers - diff = ByteBufferUtils.compareTo(left.getQualifierByteBuffer(), - left.getQualifierPosition(leftFamilyPosition, leftFamilyLength), leftQualifierLength, - right.getQualifierByteBuffer(), - right.getQualifierPosition(rightFamilyPosition, rightFamilyLength), - rightQualifierLength); - if (diff != 0) { - return diff; - } - // Timestamps. - diff = compareTimestampsInternal(left.getTimestamp(leftKeyLength), - right.getTimestamp(rightKeyLength)); - if (diff != 0) { - return diff; - } - // Compare types. Let the delete types sort ahead of puts; i.e. types - // of higher numbers sort before those of lesser numbers. Maximum (255) - // appears ahead of everything, and minimum (0) appears after - // everything. - return (0xff & rightType) - (0xff & leftType); + return ignoreSequenceid? diff: Long.compare(b.getSequenceId(), a.getSequenceId()); } /** @@ -254,7 +182,7 @@ public class CellComparatorImpl implements CellComparator { return compareRows(left, left.getRowLength(), right, right.getRowLength()); } - int compareRows(final Cell left, int leftRowLength, final Cell right, int rightRowLength) { + static int compareRows(final Cell left, int leftRowLength, final Cell right, int rightRowLength) { // left and right can be exactly the same at the beginning of a row if (left == right) { return 0; @@ -341,7 +269,7 @@ public class CellComparatorImpl implements CellComparator { return diff; } - diff = compareTimestamps(left, right); + diff = compareTimestamps(left.getTimestamp(), right.getTimestamp()); if (diff != 0) { return diff; } @@ -355,16 +283,12 @@ public class CellComparatorImpl implements CellComparator { @Override public int compareTimestamps(final Cell left, final Cell right) { - return compareTimestampsInternal(left.getTimestamp(), right.getTimestamp()); + return compareTimestamps(left.getTimestamp(), right.getTimestamp()); } @Override public int compareTimestamps(final long ltimestamp, final long rtimestamp) { - return compareTimestampsInternal(ltimestamp, rtimestamp); - } - - private static final int compareTimestampsInternal(final long ltimestamp, final long rtimestamp) { - // Swap the times so sort is newest to oldest, descending. + // Swap order we pass into compare so we get DESCENDING order. return Long.compare(rtimestamp, ltimestamp); } @@ -373,6 +297,7 @@ public class CellComparatorImpl implements CellComparator { * {@link KeyValue}s. */ public static class MetaCellComparator extends CellComparatorImpl { + // TODO: Do we need a ByteBufferKeyValue version of this? @Override public int compareRows(final Cell left, final Cell right) { return compareRows(left.getRowArray(), left.getRowOffset(), left.getRowLength(), @@ -451,5 +376,15 @@ public class CellComparatorImpl implements CellComparator { right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset)); return result; } + + @Override + public Comparator getSimpleComparator() { + return this; + } + } + + @Override + public Comparator getSimpleComparator() { + return new BBKVComparator(this); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 07b0e3f7986..8efe88b8e0c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -30,9 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience; * must implement this. */ @InterfaceAudience.Private -public interface ExtendedCell extends RawCell, HeapSize, Cloneable { - +public interface ExtendedCell extends RawCell, HeapSize { int CELL_NOT_BASED_ON_CHUNK = -1; + /** * Write this cell to an OutputStream in a {@link KeyValue} format. *
KeyValue format
@@ -87,6 +87,13 @@ public interface ExtendedCell extends RawCell, HeapSize, Cloneable { getValueLength(), getTagsLength(), withTags); } + /** + * @return Serialized size (defaults to include tag length). + */ + default int getSerializedSize() { + return getSerializedSize(true); + } + /** * Write this Cell into the given buf's offset in a {@link KeyValue} format. * @param buf The buffer where to write the Cell. @@ -108,7 +115,8 @@ public interface ExtendedCell extends RawCell, HeapSize, Cloneable { /** * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized * chunks as in case of MemstoreLAB - * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1 + * @return the chunk id if the cell is backed by fixed sized Chunks, else return + * {@link #CELL_NOT_BASED_ON_CHUNK}; i.e. -1. */ default int getChunkId() { return CELL_NOT_BASED_ON_CHUNK; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java index 7093b4b2dcf..e47702389f1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/IndividualBytesFieldCell.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class IndividualBytesFieldCell implements ExtendedCell { +public class IndividualBytesFieldCell implements ExtendedCell, Cloneable { private static final long FIXED_OVERHEAD = ClassSize.align( // do alignment(padding gap) ClassSize.OBJECT // object header diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index f3bfbd3fd02..f7f6c0ded46 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -76,7 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * length and actual tag bytes length. */ @InterfaceAudience.Private -public class KeyValue implements ExtendedCell { +public class KeyValue implements ExtendedCell, Cloneable { private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList<>(); private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 2246002eea4..11745a4d96d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -50,8 +50,7 @@ public final class ByteBufferUtils { public final static int NEXT_BIT_MASK = 1 << 7; @VisibleForTesting final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable(); - @VisibleForTesting - final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned(); + public final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned(); private ByteBufferUtils() { } @@ -630,6 +629,8 @@ public final class ByteBufferUtils { } public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) { + // NOTE: This method is copied over in BBKVComparator!!!!! For perf reasons. If you make + // changes here, make them there too!!!! if (UNSAFE_UNALIGNED) { long offset1Adj, offset2Adj; Object refObj1 = null, refObj2 = null; @@ -671,9 +672,12 @@ public final class ByteBufferUtils { return compareTo(buf1, o1, l1, buf2, o2, l2) == 0; } + // The below two methods show up in lots of places. Versions of them in commons util and in + // Cassandra. In guava too? They are copied from ByteBufferUtils. They are here as static + // privates. Seems to make code smaller and make Hotspot happier (comes of compares and study + // of compiled code via jitwatch). + public static int compareTo(byte [] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) { - // This method is nearly same as the compareTo that follows but hard sharing code given - // byte array and bytebuffer types and this is a hot code path if (UNSAFE_UNALIGNED) { long offset2Adj; Object refObj2 = null; @@ -738,7 +742,7 @@ public final class ByteBufferUtils { long lw = UnsafeAccess.theUnsafe.getLong(obj1, o1 + (long) i); long rw = UnsafeAccess.theUnsafe.getLong(obj2, o2 + (long) i); if (lw != rw) { - if (!UnsafeAccess.littleEndian) { + if (!UnsafeAccess.LITTLE_ENDIAN) { return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 6eb09c11df4..15facea3071 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -1549,7 +1549,7 @@ public class Bytes implements Comparable { long lw = theUnsafe.getLong(buffer1, offset1Adj + i); long rw = theUnsafe.getLong(buffer2, offset2Adj + i); if (lw != rw) { - if(!UnsafeAccess.littleEndian) { + if(!UnsafeAccess.LITTLE_ENDIAN) { return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index 486f81beb6d..953ad5b533e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -42,7 +42,7 @@ public final class UnsafeAccess { /** The offset to the first element in a byte array. */ public static final long BYTE_ARRAY_BASE_OFFSET; - static final boolean littleEndian = ByteOrder.nativeOrder() + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() .equals(ByteOrder.LITTLE_ENDIAN); // This number limits the number of bytes to copy per call to Unsafe's @@ -81,7 +81,7 @@ public final class UnsafeAccess { * @return the short value */ public static short toShort(byte[] bytes, int offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Short.reverseBytes(theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET)); } else { return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET); @@ -95,7 +95,7 @@ public final class UnsafeAccess { * @return the int value */ public static int toInt(byte[] bytes, int offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET)); } else { return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET); @@ -109,7 +109,7 @@ public final class UnsafeAccess { * @return the long value */ public static long toLong(byte[] bytes, int offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Long.reverseBytes(theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET)); } else { return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET); @@ -125,7 +125,7 @@ public final class UnsafeAccess { * @return incremented offset */ public static int putShort(byte[] bytes, int offset, short val) { - if (littleEndian) { + if (LITTLE_ENDIAN) { val = Short.reverseBytes(val); } theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val); @@ -140,7 +140,7 @@ public final class UnsafeAccess { * @return incremented offset */ public static int putInt(byte[] bytes, int offset, int val) { - if (littleEndian) { + if (LITTLE_ENDIAN) { val = Integer.reverseBytes(val); } theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val); @@ -155,7 +155,7 @@ public final class UnsafeAccess { * @return incremented offset */ public static int putLong(byte[] bytes, int offset, long val) { - if (littleEndian) { + if (LITTLE_ENDIAN) { val = Long.reverseBytes(val); } theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val); @@ -172,7 +172,7 @@ public final class UnsafeAccess { * @return short value at offset */ public static short toShort(ByteBuffer buf, int offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Short.reverseBytes(getAsShort(buf, offset)); } return getAsShort(buf, offset); @@ -186,7 +186,7 @@ public final class UnsafeAccess { * @return short value at offset */ public static short toShort(Object ref, long offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Short.reverseBytes(theUnsafe.getShort(ref, offset)); } return theUnsafe.getShort(ref, offset); @@ -214,7 +214,7 @@ public final class UnsafeAccess { * @return int value at offset */ public static int toInt(ByteBuffer buf, int offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Integer.reverseBytes(getAsInt(buf, offset)); } return getAsInt(buf, offset); @@ -228,7 +228,7 @@ public final class UnsafeAccess { * @return int value at offset */ public static int toInt(Object ref, long offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Integer.reverseBytes(theUnsafe.getInt(ref, offset)); } return theUnsafe.getInt(ref, offset); @@ -256,7 +256,7 @@ public final class UnsafeAccess { * @return long value at offset */ public static long toLong(ByteBuffer buf, int offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Long.reverseBytes(getAsLong(buf, offset)); } return getAsLong(buf, offset); @@ -270,7 +270,7 @@ public final class UnsafeAccess { * @return long value at offset */ public static long toLong(Object ref, long offset) { - if (littleEndian) { + if (LITTLE_ENDIAN) { return Long.reverseBytes(theUnsafe.getLong(ref, offset)); } return theUnsafe.getLong(ref, offset); @@ -297,7 +297,7 @@ public final class UnsafeAccess { * @return incremented offset */ public static int putInt(ByteBuffer buf, int offset, int val) { - if (littleEndian) { + if (LITTLE_ENDIAN) { val = Integer.reverseBytes(val); } if (buf.isDirect()) { @@ -402,7 +402,7 @@ public final class UnsafeAccess { * @return incremented offset */ public static int putShort(ByteBuffer buf, int offset, short val) { - if (littleEndian) { + if (LITTLE_ENDIAN) { val = Short.reverseBytes(val); } if (buf.isDirect()) { @@ -421,7 +421,7 @@ public final class UnsafeAccess { * @return incremented offset */ public static int putLong(ByteBuffer buf, int offset, long val) { - if (littleEndian) { + if (LITTLE_ENDIAN) { val = Long.reverseBytes(val); } if (buf.isDirect()) { diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java index 706941154d6..6443d84ebd2 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestByteBufferKeyValue.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentSkipListMap; + import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -67,9 +69,23 @@ public class TestByteBufferKeyValue { assertTrue(CellComparatorImpl.COMPARATOR.compare(cell1, cell3) < 0); Cell cell4 = getOffheapCell(row1, Bytes.toBytes("f"), qual2); assertTrue(CellComparatorImpl.COMPARATOR.compare(cell1, cell4) > 0); + BBKVComparator comparator = new BBKVComparator(null); + assertTrue(comparator.compare(cell1, cell2) < 0); + assertTrue(comparator.compare(cell1, cell3) < 0); + assertTrue(comparator.compare(cell1, cell4) > 0); + ByteBuffer buf = ByteBuffer.allocate(row1.length); + ByteBufferUtils.copyFromArrayToBuffer(buf, row1, 0, row1.length); + + ConcurrentSkipListMap map = + new ConcurrentSkipListMap<>(comparator); + map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1); + map.put((ByteBufferKeyValue)cell2, (ByteBufferKeyValue)cell2); + map.put((ByteBufferKeyValue)cell3, (ByteBufferKeyValue)cell3); + map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1); + map.put((ByteBufferKeyValue)cell1, (ByteBufferKeyValue)cell1); } - private Cell getOffheapCell(byte [] row, byte [] family, byte [] qualifier) { + private static Cell getOffheapCell(byte [] row, byte [] family, byte [] qualifier) { KeyValue kvCell = new KeyValue(row, family, qualifier, 0L, Type.Put, row); ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length); ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 4c9f6f6d659..5bcaa172d1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -332,8 +332,8 @@ public class HFile { try { ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction()); } catch (UnsupportedOperationException uoe) { - if (LOG.isTraceEnabled()) LOG.trace("Unable to set drop behind on " + path, uoe); - else if (LOG.isDebugEnabled()) LOG.debug("Unable to set drop behind on " + path); + LOG.trace("Unable to set drop behind on {}", path, uoe); + LOG.debug("Unable to set drop behind on {}", path.getName()); } } return new HFileWriterImpl(conf, cacheConf, path, ostream, comparator, fileContext); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index a4fe883668c..94a256d9341 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -53,7 +53,7 @@ public class CellSet implements NavigableSet { private final int numUniqueKeys; CellSet(final CellComparator c) { - this.delegatee = new ConcurrentSkipListMap<>(c); + this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator()); this.numUniqueKeys = UNKNOWN_NUM_UNIQUES; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index 5577be47ad2..44ba65f6208 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -46,7 +46,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto @InterfaceAudience.Private public class ChunkCreator { private static final Logger LOG = LoggerFactory.getLogger(ChunkCreator.class); - // monotonically increasing chunkid + // monotonically increasing chunkid. Starts at 1. private AtomicInteger chunkID = new AtomicInteger(1); // maps the chunk against the monotonically increasing chunk id. We need to preserve the // natural ordering of the key diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 49263984057..c0203a49052 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2552,8 +2552,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } MemStoreSize mss = this.memStoreSizing.getMemStoreSize(); LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + " column families," + - " memstore data size=" + StringUtils.byteDesc(mss.getDataSize()) + - " memstore heap size=" + StringUtils.byteDesc(mss.getHeapSize()) + + " dataSize=" + StringUtils.byteDesc(mss.getDataSize()) + + " heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); } @@ -2753,10 +2753,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long time = EnvironmentEdgeManager.currentTime() - startTime; MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize(); long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize(); - String msg = "Finished memstore flush;" - + " data size ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize() - + ", heap size ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize() - + ", currentsize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + String msg = "Finished flush of" + + " dataSize ~" + StringUtils.byteDesc(mss.getDataSize()) + "/" + mss.getDataSize() + + ", heapSize ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize() + + ", currentSize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid=" + flushOpSeqId + ", compaction requested=" + compactionRequested + ((wal == null) ? "; wal=null" : ""); @@ -4236,7 +4236,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param cellItr * @param now */ - public void updateCellTimestamps(final Iterable> cellItr, final byte[] now) + private static void updateCellTimestamps(final Iterable> cellItr, final byte[] now) throws IOException { for (List cells: cellItr) { if (cells == null) continue; @@ -4291,12 +4291,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi requestFlush(); // Don't print current limit because it will vary too much. The message is used as a key // over in RetriesExhaustedWithDetailsException processing. - throw new RegionTooBusyException("Over memstore limit; regionName=" + + throw new RegionTooBusyException("Over memstore limit=" + + org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) + + ", regionName=" + (this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) + - ", server=" + (this.getRegionServerServices() == null ? "unknown": - this.getRegionServerServices().getServerName()) + - ", blockingMemStoreSize=" + - org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(blockingMemStoreSize)); + ", server=" + (this.getRegionServerServices() == null? "unknown": + this.getRegionServerServices().getServerName())); } } @@ -8602,4 +8602,5 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public void requestFlush(FlushLifeCycleTracker tracker) throws IOException { requestFlush0(tracker); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 0db0fd9c285..ac7223f2d8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -120,12 +120,13 @@ public class MemStoreLABImpl implements MemStoreLAB { */ @Override public Cell forceCopyOfBigCellInto(Cell cell) { - int size = KeyValueUtil.length(cell) + ChunkCreator.SIZEOF_CHUNK_HEADER; + int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): + KeyValueUtil.length(cell); + size += ChunkCreator.SIZEOF_CHUNK_HEADER; Preconditions.checkArgument(size >= 0, "negative size"); if (size <= dataChunkSize) { // Using copyCellInto for cells which are bigger than the original maxAlloc - Cell newCell = copyCellInto(cell, dataChunkSize); - return newCell; + return copyCellInto(cell, dataChunkSize); } else { Chunk c = getNewExternalChunk(size); int allocOffset = c.alloc(size); @@ -134,7 +135,8 @@ public class MemStoreLABImpl implements MemStoreLAB { } private Cell copyCellInto(Cell cell, int maxAlloc) { - int size = KeyValueUtil.length(cell); + int size = cell instanceof ExtendedCell? ((ExtendedCell)cell).getSerializedSize(): + KeyValueUtil.length(cell); Preconditions.checkArgument(size >= 0, "negative size"); // Callers should satisfy large allocations directly from JVM since they // don't cause fragmentation as badly. @@ -169,7 +171,7 @@ public class MemStoreLABImpl implements MemStoreLAB { * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid * out of it */ - private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { + private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { int tagsLen = cell.getTagsLength(); if (cell instanceof ExtendedCell) { ((ExtendedCell) cell).write(buf, offset); @@ -255,8 +257,7 @@ public class MemStoreLABImpl implements MemStoreLAB { */ private Chunk getOrMakeChunk() { // Try to get the chunk - Chunk c; - c = currChunk.get(); + Chunk c = currChunk.get(); if (c != null) { return c; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 7727b108c67..24604d988e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -47,9 +47,11 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * implementations: *
    *
  • defaultProvider : whatever provider is standard for the hbase version. Currently - * "filesystem"
  • + * "asyncfs" + *
  • asyncfs : a provider that will run on top of an implementation of the Hadoop + * FileSystem interface via an asynchronous client.
  • *
  • filesystem : a provider that will run on top of an implementation of the Hadoop - * FileSystem interface, normally HDFS.
  • + * FileSystem interface via HDFS's synchronous DFSClient. *
  • multiwal : a provider that will use multiple "filesystem" wal instances per region * server.
  • *
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java index dc47661166d..d852d2db2ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestTagRewriteCell.java @@ -47,9 +47,10 @@ public class TestTagRewriteCell { // VisibilityController and AccessController) Cell trCell2 = PrivateCellUtil.createCell(trCell, new byte[fakeTagArrayLength]); - assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be larger than a " + - "single TagRewriteCell's heapsize", trCellHeapSize < ((HeapSize)trCell2).heapSize()); - assertTrue("TagRewriteCell should have had nulled out tags array", ((HeapSize)trCell).heapSize() < - trCellHeapSize); + assertTrue("TagRewriteCell containing a TagRewriteCell's heapsize should be " + + "larger than a single TagRewriteCell's heapsize", + trCellHeapSize < ((HeapSize)trCell2).heapSize()); + assertTrue("TagRewriteCell should have had nulled out tags array", + ((HeapSize)trCell).heapSize() < trCellHeapSize); } } From 3a805074a2f5a143fbc35bd357a609341fea34c4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 23 May 2018 15:51:04 -0700 Subject: [PATCH 3/4] HBASE-20627 Relocate RS Group pre/post hooks from RSGroupAdminServer to RSGroupAdminEndpoint --- .../hbase/rsgroup/RSGroupAdminEndpoint.java | 48 +++++- .../hbase/rsgroup/RSGroupAdminServer.java | 44 ------ .../hadoop/hbase/rsgroup/TestRSGroups.java | 138 +++++++++++++++++- 3 files changed, 182 insertions(+), 48 deletions(-) diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java index 6f3c1d1f15f..365082682be 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminEndpoint.java @@ -202,8 +202,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup " + request.getTargetGroup()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServers(hostPorts, request.getTargetGroup()); + } checkPermission("moveServers"); groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServers(hostPorts, request.getTargetGroup()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -221,8 +227,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup " + request.getTargetGroup()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveTables(tables, request.getTargetGroup()); + } checkPermission("moveTables"); groupAdminServer.moveTables(tables, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveTables(tables, request.getTargetGroup()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -235,8 +247,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preAddRSGroup(request.getRSGroupName()); + } checkPermission("addRSGroup"); groupAdminServer.addRSGroup(request.getRSGroupName()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postAddRSGroup(request.getRSGroupName()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -250,8 +268,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { RemoveRSGroupResponse.newBuilder(); LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveRSGroup(request.getRSGroupName()); + } checkPermission("removeRSGroup"); groupAdminServer.removeRSGroup(request.getRSGroupName()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveRSGroup(request.getRSGroupName()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -265,8 +289,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group=" + request.getRSGroupName()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preBalanceRSGroup(request.getRSGroupName()); + } checkPermission("balanceRSGroup"); - builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); + boolean balancerRan = groupAdminServer.balanceRSGroup(request.getRSGroupName()); + builder.setBalanceRan(balancerRan); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postBalanceRSGroup(request.getRSGroupName(), + balancerRan); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); builder.setBalanceRan(false); @@ -325,8 +357,16 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts + " and tables " + tables + " to rsgroup" + request.getTargetGroup()); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preMoveServersAndTables(hostPorts, tables, + request.getTargetGroup()); + } checkPermission("moveServersAndTables"); groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup()); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postMoveServersAndTables(hostPorts, tables, + request.getTargetGroup()); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } @@ -346,8 +386,14 @@ public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { LOG.info(master.getClientIdAuditPrefix() + " remove decommissioned servers from rsgroup: " + servers); try { + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().preRemoveServers(servers); + } checkPermission("removeServers"); groupAdminServer.removeServers(servers); + if (master.getMasterCoprocessorHost() != null) { + master.getMasterCoprocessorHost().postRemoveServers(servers); + } } catch (IOException e) { CoprocessorRpcUtils.setControllerException(controller, e); } diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 670e8aa62ba..b39d3a19a1b 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -291,9 +291,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveServers(servers, targetGroupName); - } // Presume first server's source group. Later ensure all servers are from this group. Address firstServer = servers.iterator().next(); RSGroupInfo srcGrp = rsGroupInfoManager.getRSGroupOfServer(firstServer); @@ -370,9 +367,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } while (foundRegionsToMove); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName); - } LOG.info("Move server done: " + srcGrp.getName() + "=>" + targetGroupName); } } @@ -390,9 +384,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveTables(tables, targetGroup); - } if(targetGroup != null) { RSGroupInfo destGroup = rsGroupInfoManager.getRSGroup(targetGroup); if(destGroup == null) { @@ -430,22 +421,12 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } } - - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup); - } } } @Override public void addRSGroup(String name) throws IOException { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preAddRSGroup(name); - } rsGroupInfoManager.addRSGroup(new RSGroupInfo(name)); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postAddRSGroup(name); - } } @Override @@ -453,9 +434,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preRemoveRSGroup(name); - } RSGroupInfo rsGroupInfo = rsGroupInfoManager.getRSGroup(name); if (rsGroupInfo == null) { throw new ConstraintException("RSGroup " + name + " does not exist"); @@ -480,9 +458,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { } } rsGroupInfoManager.removeRSGroup(name); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postRemoveRSGroup(name); - } } } @@ -498,9 +473,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { return false; } - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preBalanceRSGroup(groupName); - } if (getRSGroupInfo(groupName) == null) { throw new ConstraintException("RSGroup does not exist: "+groupName); } @@ -542,9 +514,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { LOG.info("RSGroup balance " + groupName + " completed after " + (System.currentTimeMillis()-startTime) + " seconds"); } - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postBalanceRSGroup(groupName, balancerRan); - } return balancerRan; } } @@ -575,9 +544,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers and tables to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preMoveServersAndTables(servers, tables, targetGroup); - } //check servers and tables status checkServersAndTables(servers, tables, targetGroup); @@ -589,10 +555,6 @@ public class RSGroupAdminServer implements RSGroupAdmin { moveRegionsFromServers(servers, tables, targetGroup); //move regions which should belong to these servers moveRegionsToServers(servers, tables, targetGroup); - - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup); - } } LOG.info("Move servers and tables done. Severs :" + servers + " , Tables : " + tables + " => " + targetGroup); @@ -607,15 +569,9 @@ public class RSGroupAdminServer implements RSGroupAdmin { // Hold a lock on the manager instance while moving servers to prevent // another writer changing our state while we are working. synchronized (rsGroupInfoManager) { - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().preRemoveServers(servers); - } //check the set of servers checkForDeadOrOnlineServers(servers); rsGroupInfoManager.removeServers(servers); - if (master.getMasterCoprocessorHost() != null) { - master.getMasterCoprocessorHost().postRemoveServers(servers); - } LOG.info("Remove decommissioned servers " + servers + " from rsgroup done."); } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java index 521b8b91a4c..3e74f819cd3 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java @@ -23,6 +23,9 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.Iterator; +import java.util.Optional; +import java.util.Set; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -36,6 +39,11 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.net.Address; @@ -69,7 +77,7 @@ public class TestRSGroups extends TestRSGroupsBase { protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroups.class); private static boolean INIT = false; private static RSGroupAdminEndpoint rsGroupAdminEndpoint; - + private static CPMasterObserver observer; @BeforeClass public static void setUp() throws Exception { @@ -80,7 +88,7 @@ public class TestRSGroups extends TestRSGroupsBase { HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, - RSGroupAdminEndpoint.class.getName()); + RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName()); // Enable quota for testRSGroupsWithHBaseQuota() TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1); @@ -104,8 +112,10 @@ public class TestRSGroups extends TestRSGroupsBase { admin.setBalancerRunning(false,true); rsGroupAdmin = new VerifyingRSGroupAdminClient( new RSGroupAdminClient(TEST_UTIL.getConnection()), TEST_UTIL.getConfiguration()); + MasterCoprocessorHost host = master.getMasterCoprocessorHost(); + observer = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName()); rsGroupAdminEndpoint = (RSGroupAdminEndpoint) - master.getMasterCoprocessorHost().findCoprocessor(RSGroupAdminEndpoint.class.getName()); + host.findCoprocessor(RSGroupAdminEndpoint.class.getName()); } @AfterClass @@ -148,6 +158,7 @@ public class TestRSGroups extends TestRSGroupsBase { } catch (Exception ex) { LOG.warn("Got this on setup, FYI", ex); } + assertTrue(observer.preMoveServersCalled); TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { @@ -221,6 +232,9 @@ public class TestRSGroups extends TestRSGroupsBase { String groupName = tablePrefix+"_foo"; LOG.info("testNamespaceConstraint"); rsGroupAdmin.addRSGroup(groupName); + assertTrue(observer.preAddRSGroupCalled); + assertTrue(observer.postAddRSGroupCalled); + admin.createNamespace(NamespaceDescriptor.create(nsName) .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) .build()); @@ -241,6 +255,8 @@ public class TestRSGroups extends TestRSGroupsBase { //test add non-existent group admin.deleteNamespace(nsName); rsGroupAdmin.removeRSGroup(groupName); + assertTrue(observer.preRemoveRSGroupCalled); + assertTrue(observer.postRemoveRSGroupCalled); try { admin.createNamespace(NamespaceDescriptor.create(nsName) .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo") @@ -261,6 +277,120 @@ public class TestRSGroups extends TestRSGroupsBase { it.next(); } + public static class CPMasterObserver implements MasterCoprocessor, MasterObserver { + boolean preBalanceRSGroupCalled = false; + boolean postBalanceRSGroupCalled = false; + boolean preMoveServersCalled = false; + boolean postMoveServersCalled = false; + boolean preMoveTablesCalled = false; + boolean postMoveTablesCalled = false; + boolean preAddRSGroupCalled = false; + boolean postAddRSGroupCalled = false; + boolean preRemoveRSGroupCalled = false; + boolean postRemoveRSGroupCalled = false; + boolean preRemoveServersCalled = false; + boolean postRemoveServersCalled = false; + boolean preMoveServersAndTables = false; + boolean postMoveServersAndTables = false; + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + @Override + public void preMoveServersAndTables(final ObserverContext ctx, + Set
servers, Set tables, String targetGroup) throws IOException { + preMoveServersAndTables = true; + } + @Override + public void postMoveServersAndTables(final ObserverContext ctx, + Set
servers, Set tables, String targetGroup) throws IOException { + postMoveServersAndTables = true; + } + @Override + public void preRemoveServers( + final ObserverContext ctx, + Set
servers) throws IOException { + preRemoveServersCalled = true; + } + @Override + public void postRemoveServers( + final ObserverContext ctx, + Set
servers) throws IOException { + postRemoveServersCalled = true; + } + @Override + public void preRemoveRSGroup(final ObserverContext ctx, + String name) throws IOException { + preRemoveRSGroupCalled = true; + } + @Override + public void postRemoveRSGroup(final ObserverContext ctx, + String name) throws IOException { + postRemoveRSGroupCalled = true; + } + @Override + public void preAddRSGroup(final ObserverContext ctx, + String name) throws IOException { + preAddRSGroupCalled = true; + } + @Override + public void postAddRSGroup(final ObserverContext ctx, + String name) throws IOException { + postAddRSGroupCalled = true; + } + @Override + public void preMoveTables(final ObserverContext ctx, + Set tables, String targetGroup) throws IOException { + preMoveTablesCalled = true; + } + @Override + public void postMoveTables(final ObserverContext ctx, + Set tables, String targetGroup) throws IOException { + postMoveTablesCalled = true; + } + @Override + public void preMoveServers(final ObserverContext ctx, + Set
servers, String targetGroup) throws IOException { + preMoveServersCalled = true; + } + + @Override + public void postMoveServers(final ObserverContext ctx, + Set
servers, String targetGroup) throws IOException { + postMoveServersCalled = true; + } + @Override + public void preBalanceRSGroup(final ObserverContext ctx, + String groupName) throws IOException { + preBalanceRSGroupCalled = true; + } + @Override + public void postBalanceRSGroup(final ObserverContext ctx, + String groupName, boolean balancerRan) throws IOException { + postBalanceRSGroupCalled = true; + } + } + @Test + public void testMoveServersAndTables() throws Exception { + super.testMoveServersAndTables(); + assertTrue(observer.preMoveServersAndTables); + assertTrue(observer.postMoveServersAndTables); + } + @Test + public void testTableMoveTruncateAndDrop() throws Exception { + super.testTableMoveTruncateAndDrop(); + assertTrue(observer.preMoveTablesCalled); + assertTrue(observer.postMoveTablesCalled); + } + + @Test + public void testRemoveServers() throws Exception { + super.testRemoveServers(); + assertTrue(observer.preRemoveServersCalled); + assertTrue(observer.postRemoveServersCalled); + } + @Test public void testMisplacedRegions() throws Exception { final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions"); @@ -277,6 +407,8 @@ public class TestRSGroups extends TestRSGroupsBase { admin.setBalancerRunning(true,true); assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); admin.setBalancerRunning(false,true); + assertTrue(observer.preBalanceRSGroupCalled); + assertTrue(observer.postBalanceRSGroupCalled); TEST_UTIL.waitFor(60000, new Predicate() { @Override From 9fbce1668b1fc1e8a7d4bdb341bbed7bf65936f2 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 17 May 2018 10:30:28 -0700 Subject: [PATCH 4/4] HBASE-20597 Use a lock to serialize access to a shared reference to ZooKeeperWatcher in HBaseReplicationEndpoint --- .../replication/HBaseReplicationEndpoint.java | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index bd5c529092d..8286f7db742 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -43,21 +43,22 @@ import org.slf4j.LoggerFactory; * target cluster is an HBase cluster. */ @InterfaceAudience.Private -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS", - justification="Thinks zkw needs to be synchronized access but should be fine as is.") public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint implements Abortable { private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); - private ZKWatcher zkw = null; // FindBugs: MT_CORRECTNESS + private Object zkwLock = new Object(); + private ZKWatcher zkw = null; private List regionServers = new ArrayList<>(0); private long lastRegionServerUpdate; protected void disconnect() { - if (zkw != null) { - zkw.close(); + synchronized (zkwLock) { + if (zkw != null) { + zkw.close(); + } } } @@ -112,7 +113,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint public synchronized UUID getPeerUUID() { UUID peerUUID = null; try { - peerUUID = ZKClusterId.getUUIDForCluster(zkw); + synchronized (zkwLock) { + peerUUID = ZKClusterId.getUUIDForCluster(zkw); + } } catch (KeeperException ke) { reconnect(ke); } @@ -124,7 +127,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * @return zk connection */ protected ZKWatcher getZkw() { - return zkw; + synchronized (zkwLock) { + return zkw; + } } /** @@ -132,10 +137,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * @throws IOException If anything goes wrong connecting */ void reloadZkWatcher() throws IOException { - if (zkw != null) zkw.close(); - zkw = new ZKWatcher(ctx.getConfiguration(), + synchronized (zkwLock) { + if (zkw != null) { + zkw.close(); + } + zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - getZkw().registerListener(new PeerRegionServerListener(this)); + zkw.registerListener(new PeerRegionServerListener(this)); + } } @Override @@ -173,13 +182,15 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * for this peer cluster * @return list of addresses */ - // Synchronize peer cluster connection attempts to avoid races and rate - // limit connections when multiple replication sources try to connect to - // the peer cluster. If the peer cluster is down we can get out of control - // over time. - public synchronized List getRegionServers() { + public List getRegionServers() { try { - setRegionServers(fetchSlavesAddresses(this.getZkw())); + // Synchronize peer cluster connection attempts to avoid races and rate + // limit connections when multiple replication sources try to connect to + // the peer cluster. If the peer cluster is down we can get out of control + // over time. + synchronized (zkwLock) { + setRegionServers(fetchSlavesAddresses(zkw)); + } } catch (KeeperException ke) { if (LOG.isDebugEnabled()) { LOG.debug("Fetch slaves addresses failed", ke);