HBASE-26036 DBB released too early in HRegion.get() and dirty data for some operations (#3436)

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Xiaolin Ha 2021-07-14 11:26:38 +08:00 committed by GitHub
parent 68aaf1ff02
commit 0836695459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 390 additions and 166 deletions

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -70,6 +71,13 @@ public class ByteBuffAllocator {
public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size"; public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size";
/**
* Set an alternate bytebuffallocator by setting this config,
* e.g. we can config {@link DeallocateRewriteByteBuffAllocator} to find out
* prematurely release issues
*/
public static final String BYTEBUFF_ALLOCATOR_CLASS = "hbase.bytebuff.allocator.class";
/** /**
* @deprecated since 2.3.0 and will be removed in 4.0.0. Use * @deprecated since 2.3.0 and will be removed in 4.0.0. Use
* {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead. * {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead.
@ -117,8 +125,8 @@ public class ByteBuffAllocator {
void free(); void free();
} }
private final boolean reservoirEnabled; protected final boolean reservoirEnabled;
private final int bufSize; protected final int bufSize;
private final int maxBufCount; private final int maxBufCount;
private final AtomicInteger usedBufCount = new AtomicInteger(0); private final AtomicInteger usedBufCount = new AtomicInteger(0);
@ -169,7 +177,9 @@ public class ByteBuffAllocator {
conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6); int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse); Class<?> clazz = conf.getClass(BYTEBUFF_ALLOCATOR_CLASS, ByteBuffAllocator.class);
return (ByteBuffAllocator) ReflectionUtils
.newInstance(clazz, true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
} else { } else {
return HEAP; return HEAP;
} }
@ -184,7 +194,7 @@ public class ByteBuffAllocator {
return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE); return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
} }
ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, protected ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) { int minSizeForReservoirUse) {
this.reservoirEnabled = reservoirEnabled; this.reservoirEnabled = reservoirEnabled;
this.maxBufCount = maxBufCount; this.maxBufCount = maxBufCount;
@ -377,7 +387,7 @@ public class ByteBuffAllocator {
* Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning. * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
* @param buf ByteBuffer to return. * @param buf ByteBuffer to return.
*/ */
private void putbackBuffer(ByteBuffer buf) { protected void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) { if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return; return;

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A ByteBuffAllocator that rewrite the bytebuffers right after released.
* It can be used for test whether there are prematurely releasing backing bytebuffers.
*/
@InterfaceAudience.Private
public class DeallocateRewriteByteBuffAllocator extends ByteBuffAllocator {
private static final Logger LOG = LoggerFactory.getLogger(
DeallocateRewriteByteBuffAllocator.class);
DeallocateRewriteByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) {
super(reservoirEnabled, maxBufCount, bufSize, minSizeForReservoirUse);
}
@Override
protected void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
}
buf.clear();
byte[] tmp = generateTmpBytes(buf.capacity());
buf.put(tmp, 0, tmp.length);
super.putbackBuffer(buf);
}
private byte[] generateTmpBytes(int length) {
StringBuilder result = new StringBuilder();
while (result.length() < length) {
result.append("-");
}
return Bytes.toBytes(result.substring(0, length));
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.WrongRegionException; import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -221,8 +223,12 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
get.setTimeRange(timeRange.getMin(), timeRange.getMax()); get.setTimeRange(timeRange.getMin(), timeRange.getMax());
} }
List<Cell> result = region.get(get, false);
boolean matches = false; boolean matches = false;
try (RegionScanner scanner = region.getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<Cell> result = new ArrayList<>();
scanner.next(result);
if (filter != null) { if (filter != null) {
if (!result.isEmpty()) { if (!result.isEmpty()) {
matches = true; matches = true;
@ -239,6 +245,7 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
matches = matches(op, compareResult); matches = matches(op, compareResult);
} }
} }
}
return matches; return matches;
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import edu.umd.cs.findbugs.annotations.Nullable; import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import java.io.EOFException; import java.io.EOFException;
@ -74,6 +73,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -176,7 +176,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -191,7 +190,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -3244,7 +3242,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, byte[] byteNow) private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, byte[] byteNow)
throws IOException { throws IOException {
List<Cell> result = get(get, false); try (RegionScanner scanner = getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<Cell> result = new ArrayList<>();
scanner.next(result);
if (result.size() < count) { if (result.size() < count) {
// Nothing to delete // Nothing to delete
@ -3257,6 +3259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Cell getCell = result.get(count - 1); Cell getCell = result.get(count - 1);
PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());
} }
}
@Override @Override
public void put(Put put) throws IOException { public void put(Put put) throws IOException {
@ -4044,16 +4047,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
get.setTimeRange(tr.getMin(), tr.getMax()); get.setTimeRange(tr.getMin(), tr.getMax());
} }
List<Cell> currentValues = region.get(get, false); try (RegionScanner scanner = region.getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<Cell> currentValues = new ArrayList<>();
scanner.next(currentValues);
// Iterate the input columns and update existing values if they were found, otherwise // Iterate the input columns and update existing values if they were found, otherwise
// add new column initialized to the delta amount // add new column initialized to the delta amount
int currentValuesIndex = 0; int currentValuesIndex = 0;
for (int i = 0; i < deltas.size(); i++) { for (int i = 0; i < deltas.size(); i++) {
Cell delta = deltas.get(i); Cell delta = deltas.get(i);
Cell currentValue = null; Cell currentValue = null;
if (currentValuesIndex < currentValues.size() && if (currentValuesIndex < currentValues.size() && CellUtil
CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) { .matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
currentValue = currentValues.get(currentValuesIndex); currentValue = currentValues.get(currentValuesIndex);
if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) { if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
currentValuesIndex++; currentValuesIndex++;
@ -4063,24 +4069,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Cell newCell; Cell newCell;
if (mutation instanceof Increment) { if (mutation instanceof Increment) {
long deltaAmount = getLongValue(delta); long deltaAmount = getLongValue(delta);
final long newValue = currentValue == null ? final long newValue = currentValue == null ? deltaAmount :
deltaAmount : getLongValue(currentValue) + deltaAmount; getLongValue(currentValue) + deltaAmount;
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
(oldCell) -> Bytes.toBytes(newValue)); (oldCell) -> Bytes.toBytes(newValue));
} else { } else {
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
(oldCell) -> (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() +
ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) oldCell.getValueLength()])
.put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
.put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength()) .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
.array() .array());
);
} }
if (region.maxCellSize > 0) { if (region.maxCellSize > 0) {
int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell); int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
if (newCellSize > region.maxCellSize) { if (newCellSize > region.maxCellSize) {
String msg = "Cell with size " + newCellSize + " exceeds limit of " String msg =
+ region.maxCellSize + " bytes in region " + this; "Cell with size " + newCellSize + " exceeds limit of " + region.maxCellSize +
" bytes in region " + this;
LOG.debug(msg); LOG.debug(msg);
throw new DoNotRetryIOException(msg); throw new DoNotRetryIOException(msg);
} }
@ -4098,6 +4104,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) : region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs); region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
} }
}
return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList()); return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
} }
@ -4858,16 +4865,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// NOTE: We used to wait here until mvcc caught up: mvcc.await(); // NOTE: We used to wait here until mvcc caught up: mvcc.await();
// Supposition is that now all changes are done under row locks, then when we go to read, // Supposition is that now all changes are done under row locks, then when we go to read,
// we'll get the latest on this row. // we'll get the latest on this row.
List<Cell> result = get(get, false);
boolean matches = false; boolean matches = false;
long cellTs = 0; long cellTs = 0;
try (RegionScanner scanner = getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<Cell> result = new ArrayList<>(1);
scanner.next(result);
if (filter != null) { if (filter != null) {
if (!result.isEmpty()) { if (!result.isEmpty()) {
matches = true; matches = true;
cellTs = result.get(0).getTimestamp(); cellTs = result.get(0).getTimestamp();
} }
} else { } else {
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean valueIsNull =
comparator.getValue() == null || comparator.getValue().length == 0;
if (result.isEmpty() && valueIsNull) { if (result.isEmpty() && valueIsNull) {
matches = true; matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
@ -4880,6 +4892,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
matches = matches(op, compareResult); matches = matches(op, compareResult);
} }
} }
}
// If matches, perform the mutation or the rowMutations // If matches, perform the mutation or the rowMutations
if (matches) { if (matches) {
@ -7558,7 +7571,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault()); scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault());
} }
try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) { try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) {
scanner.next(results); List<Cell> tmp = new ArrayList<>();
scanner.next(tmp);
// Copy EC to heap, then close the scanner.
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
// See more details in HBASE-26036.
for (Cell cell : tmp) {
results.add(cell instanceof ByteBufferExtendedCell ?
((ByteBufferExtendedCell) cell).deepClone(): cell);
}
} }
// post-get CP hook // post-get CP hook

View File

@ -397,7 +397,11 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
} }
get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags, get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags,
VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT)); VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT));
List<Cell> result = ctx.getEnvironment().getRegion().get(get, false); try (RegionScanner scanner = ctx.getEnvironment().getRegion().getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<Cell> result = new ArrayList<>();
scanner.next(result);
if (result.size() < get.getMaxVersions()) { if (result.size() < get.getMaxVersions()) {
// Nothing to delete // Nothing to delete
@ -405,12 +409,12 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
return; return;
} }
if (result.size() > get.getMaxVersions()) { if (result.size() > get.getMaxVersions()) {
throw new RuntimeException("Unexpected size: " + result.size() throw new RuntimeException("Unexpected size: " + result.size() +
+ ". Results more than the max versions obtained."); ". Results more than the max versions obtained.");
} }
Cell getCell = result.get(get.getMaxVersions() - 1); Cell getCell = result.get(get.getMaxVersions() - 1);
PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp()); PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());
}
// We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would // We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would
// update with the current timestamp after again doing a get. As the hook as already determined // update with the current timestamp after again doing a get. As the hook as already determined
// the needed timestamp we need to bypass here. // the needed timestamp we need to bypass here.

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.Threads.sleep;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(LargeTests.class)
public class TestCheckAndMutateWithByteBuff {
private static final Logger LOG = LoggerFactory.getLogger(TestCheckAndMutateWithByteBuff.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCheckAndMutateWithByteBuff.class);
@Rule
public TestName name = new TestName();
private static final byte[] CF = Bytes.toBytes("CF");
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Configuration conf = TEST_UTIL.getConfiguration();
private static Admin admin = null;
@BeforeClass
public static void setupBeforeClass() throws Exception {
conf.set(HConstants.REGION_IMPL, TestCheckAndMutateRegion.class.getName());
conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
DeallocateRewriteByteBuffAllocator.class.getName());
conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 1);
conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024);
conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
conf.setInt("hbase.client.retries.number", 1);
TEST_UTIL.startMiniCluster();
admin = TEST_UTIL.getAdmin();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testCheckAndMutateWithByteBuff() throws Exception {
Table testTable = createTable(TableName.valueOf(name.getMethodName()));
byte[] checkRow = Bytes.toBytes("checkRow");
byte[] checkQualifier = Bytes.toBytes("cq");
byte[] checkValue = Bytes.toBytes("checkValue");
Put put = new Put(checkRow);
put.addColumn(CF, checkQualifier, checkValue);
testTable.put(put);
admin.flush(testTable.getName());
assertTrue(testTable.checkAndMutate(checkRow, CF).qualifier(checkQualifier).
ifEquals(checkValue)
.thenPut(new Put(checkRow).addColumn(CF, Bytes.toBytes("q1"),
Bytes.toBytes("testValue"))));
}
private Table createTable(TableName tableName)
throws IOException {
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).setBlocksize(100).build())
.build();
return TEST_UTIL.createTable(td, null);
}
/**
* An override of HRegion to allow sleep after get(), waiting for the release of DBB
*/
public static class TestCheckAndMutateRegion extends HRegion {
public TestCheckAndMutateRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam,
RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
public TestCheckAndMutateRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
TableDescriptor htd, RegionServerServices rsServices) {
super(fs, wal, confParam, htd, rsServices);
}
@Override
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
List<Cell> cells = super.get(get, withCoprocessor);
sleep(600);
return cells;
}
}
}

View File

@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY; import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL; import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -31,7 +30,6 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.AuthUtil;
@ -49,10 +47,12 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode; import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
@ -164,7 +164,27 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer
assert (labelsRegion != null || systemCall); assert (labelsRegion != null || systemCall);
List<String> auths = new ArrayList<>(); List<String> auths = new ArrayList<>();
Get get = new Get(user); Get get = new Get(user);
List<Cell> cells = null; getAuths(get, auths);
return auths;
}
@Override
public List<String> getGroupAuths(String[] groups, boolean systemCall) throws IOException {
assert (labelsRegion != null || systemCall);
List<String> auths = new ArrayList<>();
if (groups != null && groups.length > 0) {
for (String group : groups) {
Get get = new Get(Bytes.toBytes(AuthUtil.toGroupEntry(group)));
getAuths(get, auths);
}
}
return auths;
}
private void getAuths(Get get, List<String> auths) throws IOException {
List<Cell> cells = new ArrayList<>();
RegionScanner scanner = null;
try {
if (labelsRegion == null) { if (labelsRegion == null) {
Table table = null; Table table = null;
Connection connection = null; Connection connection = null;
@ -182,53 +202,21 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer
} }
} }
} else { } else {
cells = this.labelsRegion.get(get, false); // NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
scanner = this.labelsRegion.getScanner(new Scan(get));
scanner.next(cells);
} }
if (cells != null) {
for (Cell cell : cells) { for (Cell cell : cells) {
String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), String auth = Bytes
cell.getQualifierLength()); .toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
auths.add(auth); auths.add(auth);
} }
}
return auths;
}
@Override
public List<String> getGroupAuths(String[] groups, boolean systemCall) throws IOException {
assert (labelsRegion != null || systemCall);
List<String> auths = new ArrayList<>();
if (groups != null && groups.length > 0) {
for (String group : groups) {
Get get = new Get(Bytes.toBytes(AuthUtil.toGroupEntry(group)));
List<Cell> cells = null;
if (labelsRegion == null) {
Table table = null;
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME);
Result result = table.get(get);
cells = result.listCells();
} finally { } finally {
if (table != null) { if (scanner != null) {
table.close(); scanner.close();
connection.close();
} }
} }
} else {
cells = this.labelsRegion.get(get, false);
}
if (cells != null) {
for (Cell cell : cells) {
String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
auths.add(auth);
}
}
}
}
return auths;
} }
@Override @Override