HBASE-26036 DBB released too early in HRegion.get() and dirty data for some operations (#3436) (#3486)
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
12f2a16280
commit
692d384ec6
|
@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -74,6 +75,13 @@ public class ByteBuffAllocator {
|
|||
|
||||
public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size";
|
||||
|
||||
/**
|
||||
* Set an alternate bytebuffallocator by setting this config,
|
||||
* e.g. we can config {@link DeallocateRewriteByteBuffAllocator} to find out
|
||||
* prematurely release issues
|
||||
*/
|
||||
public static final String BYTEBUFF_ALLOCATOR_CLASS = "hbase.bytebuff.allocator.class";
|
||||
|
||||
/**
|
||||
* @deprecated since 2.3.0 and will be removed in 4.0.0. Use
|
||||
* {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead.
|
||||
|
@ -121,8 +129,8 @@ public class ByteBuffAllocator {
|
|||
void free();
|
||||
}
|
||||
|
||||
private final boolean reservoirEnabled;
|
||||
private final int bufSize;
|
||||
protected final boolean reservoirEnabled;
|
||||
protected final int bufSize;
|
||||
private final int maxBufCount;
|
||||
private final AtomicInteger usedBufCount = new AtomicInteger(0);
|
||||
|
||||
|
@ -173,7 +181,9 @@ public class ByteBuffAllocator {
|
|||
conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
|
||||
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
|
||||
int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
|
||||
return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
|
||||
Class<?> clazz = conf.getClass(BYTEBUFF_ALLOCATOR_CLASS, ByteBuffAllocator.class);
|
||||
return (ByteBuffAllocator) ReflectionUtils
|
||||
.newInstance(clazz, true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
|
||||
} else {
|
||||
return HEAP;
|
||||
}
|
||||
|
@ -188,8 +198,8 @@ public class ByteBuffAllocator {
|
|||
return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
|
||||
int minSizeForReservoirUse) {
|
||||
protected ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
|
||||
int minSizeForReservoirUse) {
|
||||
this.reservoirEnabled = reservoirEnabled;
|
||||
this.maxBufCount = maxBufCount;
|
||||
this.bufSize = bufSize;
|
||||
|
@ -381,7 +391,7 @@ public class ByteBuffAllocator {
|
|||
* Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
|
||||
* @param buf ByteBuffer to return.
|
||||
*/
|
||||
private void putbackBuffer(ByteBuffer buf) {
|
||||
protected void putbackBuffer(ByteBuffer buf) {
|
||||
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
|
||||
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
|
||||
return;
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A ByteBuffAllocator that rewrite the bytebuffers right after released.
|
||||
* It can be used for test whether there are prematurely releasing backing bytebuffers.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DeallocateRewriteByteBuffAllocator extends ByteBuffAllocator {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
DeallocateRewriteByteBuffAllocator.class);
|
||||
|
||||
DeallocateRewriteByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
|
||||
int minSizeForReservoirUse) {
|
||||
super(reservoirEnabled, maxBufCount, bufSize, minSizeForReservoirUse);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void putbackBuffer(ByteBuffer buf) {
|
||||
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
|
||||
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
|
||||
return;
|
||||
}
|
||||
buf.clear();
|
||||
byte[] tmp = generateTmpBytes(buf.capacity());
|
||||
buf.put(tmp, 0, tmp.length);
|
||||
super.putbackBuffer(buf);
|
||||
}
|
||||
|
||||
private byte[] generateTmpBytes(int length) {
|
||||
StringBuilder result = new StringBuilder();
|
||||
while (result.length() < length) {
|
||||
result.append("-");
|
||||
}
|
||||
return Bytes.toBytes(result.substring(0, length));
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateR
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -221,22 +223,27 @@ public class MultiRowMutationEndpoint extends MultiRowMutationService implements
|
|||
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||
}
|
||||
|
||||
List<Cell> result = region.get(get, false);
|
||||
boolean matches = false;
|
||||
if (filter != null) {
|
||||
if (!result.isEmpty()) {
|
||||
matches = true;
|
||||
}
|
||||
} else {
|
||||
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
|
||||
if (result.isEmpty() && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() == 1 && !valueIsNull) {
|
||||
Cell kv = result.get(0);
|
||||
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
|
||||
matches = matches(op, compareResult);
|
||||
try (RegionScanner scanner = region.getScanner(new Scan(get))) {
|
||||
// NOTE: Please don't use HRegion.get() instead,
|
||||
// because it will copy cells to heap. See HBASE-26036
|
||||
List<Cell> result = new ArrayList<>();
|
||||
scanner.next(result);
|
||||
if (filter != null) {
|
||||
if (!result.isEmpty()) {
|
||||
matches = true;
|
||||
}
|
||||
} else {
|
||||
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
|
||||
if (result.isEmpty() && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() == 1 && !valueIsNull) {
|
||||
Cell kv = result.get(0);
|
||||
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
|
||||
matches = matches(op, compareResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
return matches;
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -77,6 +76,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
|
@ -192,6 +192,9 @@ import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
|||
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
@ -201,10 +204,26 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
|
||||
/**
|
||||
* Regions store data for a certain region of a table. It stores all columns
|
||||
* for each row. A given table consists of one or more Regions.
|
||||
|
@ -3218,18 +3237,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, byte[] byteNow)
|
||||
throws IOException {
|
||||
List<Cell> result = get(get, false);
|
||||
try (RegionScanner scanner = getScanner(new Scan(get))) {
|
||||
// NOTE: Please don't use HRegion.get() instead,
|
||||
// because it will copy cells to heap. See HBASE-26036
|
||||
List<Cell> result = new ArrayList<>();
|
||||
scanner.next(result);
|
||||
|
||||
if (result.size() < count) {
|
||||
// Nothing to delete
|
||||
PrivateCellUtil.updateLatestStamp(cell, byteNow);
|
||||
return;
|
||||
if (result.size() < count) {
|
||||
// Nothing to delete
|
||||
PrivateCellUtil.updateLatestStamp(cell, byteNow);
|
||||
return;
|
||||
}
|
||||
if (result.size() > count) {
|
||||
throw new RuntimeException("Unexpected size: " + result.size());
|
||||
}
|
||||
Cell getCell = result.get(count - 1);
|
||||
PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());
|
||||
}
|
||||
if (result.size() > count) {
|
||||
throw new RuntimeException("Unexpected size: " + result.size());
|
||||
}
|
||||
Cell getCell = result.get(count - 1);
|
||||
PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -4016,60 +4040,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
get.setTimeRange(tr.getMin(), tr.getMax());
|
||||
}
|
||||
|
||||
List<Cell> currentValues = region.get(get, false);
|
||||
|
||||
// Iterate the input columns and update existing values if they were found, otherwise
|
||||
// add new column initialized to the delta amount
|
||||
int currentValuesIndex = 0;
|
||||
for (int i = 0; i < deltas.size(); i++) {
|
||||
Cell delta = deltas.get(i);
|
||||
Cell currentValue = null;
|
||||
if (currentValuesIndex < currentValues.size() &&
|
||||
CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
|
||||
currentValue = currentValues.get(currentValuesIndex);
|
||||
if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
|
||||
currentValuesIndex++;
|
||||
try (RegionScanner scanner = region.getScanner(new Scan(get))) {
|
||||
// NOTE: Please don't use HRegion.get() instead,
|
||||
// because it will copy cells to heap. See HBASE-26036
|
||||
List<Cell> currentValues = new ArrayList<>();
|
||||
scanner.next(currentValues);
|
||||
// Iterate the input columns and update existing values if they were found, otherwise
|
||||
// add new column initialized to the delta amount
|
||||
int currentValuesIndex = 0;
|
||||
for (int i = 0; i < deltas.size(); i++) {
|
||||
Cell delta = deltas.get(i);
|
||||
Cell currentValue = null;
|
||||
if (currentValuesIndex < currentValues.size() && CellUtil
|
||||
.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
|
||||
currentValue = currentValues.get(currentValuesIndex);
|
||||
if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
|
||||
currentValuesIndex++;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Switch on whether this an increment or an append building the new Cell to apply.
|
||||
Cell newCell;
|
||||
if (mutation instanceof Increment) {
|
||||
long deltaAmount = getLongValue(delta);
|
||||
final long newValue = currentValue == null ?
|
||||
deltaAmount : getLongValue(currentValue) + deltaAmount;
|
||||
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
|
||||
(oldCell) -> Bytes.toBytes(newValue));
|
||||
} else {
|
||||
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
|
||||
(oldCell) ->
|
||||
ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
|
||||
// Switch on whether this an increment or an append building the new Cell to apply.
|
||||
Cell newCell;
|
||||
if (mutation instanceof Increment) {
|
||||
long deltaAmount = getLongValue(delta);
|
||||
final long newValue = currentValue == null ? deltaAmount :
|
||||
getLongValue(currentValue) + deltaAmount;
|
||||
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
|
||||
(oldCell) -> Bytes.toBytes(newValue));
|
||||
} else {
|
||||
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
|
||||
(oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() +
|
||||
oldCell.getValueLength()])
|
||||
.put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
|
||||
.put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
|
||||
.array()
|
||||
);
|
||||
}
|
||||
if (region.maxCellSize > 0) {
|
||||
int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
|
||||
if (newCellSize > region.maxCellSize) {
|
||||
String msg = "Cell with size " + newCellSize + " exceeds limit of "
|
||||
+ region.maxCellSize + " bytes in region " + this;
|
||||
LOG.debug(msg);
|
||||
throw new DoNotRetryIOException(msg);
|
||||
.array());
|
||||
}
|
||||
if (region.maxCellSize > 0) {
|
||||
int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
|
||||
if (newCellSize > region.maxCellSize) {
|
||||
String msg =
|
||||
"Cell with size " + newCellSize + " exceeds limit of " + region.maxCellSize +
|
||||
" bytes in region " + this;
|
||||
LOG.debug(msg);
|
||||
throw new DoNotRetryIOException(msg);
|
||||
}
|
||||
}
|
||||
cellPairs.add(new Pair<>(currentValue, newCell));
|
||||
// Add to results to get returned to the Client. If null, cilent does not want results.
|
||||
if (results != null) {
|
||||
results.add(newCell);
|
||||
}
|
||||
}
|
||||
cellPairs.add(new Pair<>(currentValue, newCell));
|
||||
// Add to results to get returned to the Client. If null, cilent does not want results.
|
||||
if (results != null) {
|
||||
results.add(newCell);
|
||||
// Give coprocessors a chance to update the new cells before apply to WAL or memstore
|
||||
if (region.coprocessorHost != null) {
|
||||
// Here the operation must be increment or append.
|
||||
cellPairs = mutation instanceof Increment ?
|
||||
region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
|
||||
region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
|
||||
}
|
||||
}
|
||||
// Give coprocessors a chance to update the new cells before apply to WAL or memstore
|
||||
if (region.coprocessorHost != null) {
|
||||
// Here the operation must be increment or append.
|
||||
cellPairs = mutation instanceof Increment ?
|
||||
region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
|
||||
region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
|
||||
}
|
||||
return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -4822,26 +4850,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// NOTE: We used to wait here until mvcc caught up: mvcc.await();
|
||||
// Supposition is that now all changes are done under row locks, then when we go to read,
|
||||
// we'll get the latest on this row.
|
||||
List<Cell> result = get(get, false);
|
||||
boolean matches = false;
|
||||
long cellTs = 0;
|
||||
if (filter != null) {
|
||||
if (!result.isEmpty()) {
|
||||
matches = true;
|
||||
cellTs = result.get(0).getTimestamp();
|
||||
}
|
||||
} else {
|
||||
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
|
||||
if (result.isEmpty() && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
|
||||
matches = true;
|
||||
cellTs = result.get(0).getTimestamp();
|
||||
} else if (result.size() == 1 && !valueIsNull) {
|
||||
Cell kv = result.get(0);
|
||||
cellTs = kv.getTimestamp();
|
||||
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
|
||||
matches = matches(op, compareResult);
|
||||
try (RegionScanner scanner = getScanner(new Scan(get))) {
|
||||
// NOTE: Please don't use HRegion.get() instead,
|
||||
// because it will copy cells to heap. See HBASE-26036
|
||||
List<Cell> result = new ArrayList<>(1);
|
||||
scanner.next(result);
|
||||
if (filter != null) {
|
||||
if (!result.isEmpty()) {
|
||||
matches = true;
|
||||
cellTs = result.get(0).getTimestamp();
|
||||
}
|
||||
} else {
|
||||
boolean valueIsNull =
|
||||
comparator.getValue() == null || comparator.getValue().length == 0;
|
||||
if (result.isEmpty() && valueIsNull) {
|
||||
matches = true;
|
||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
|
||||
matches = true;
|
||||
cellTs = result.get(0).getTimestamp();
|
||||
} else if (result.size() == 1 && !valueIsNull) {
|
||||
Cell kv = result.get(0);
|
||||
cellTs = kv.getTimestamp();
|
||||
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
|
||||
matches = matches(op, compareResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7503,13 +7537,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (scan.getLoadColumnFamiliesOnDemandValue() == null) {
|
||||
scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
RegionScanner scanner = null;
|
||||
try {
|
||||
scanner = getScanner(scan, null, nonceGroup, nonce);
|
||||
scanner.next(results);
|
||||
} finally {
|
||||
if (scanner != null)
|
||||
scanner.close();
|
||||
try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) {
|
||||
List<Cell> tmp = new ArrayList<>();
|
||||
scanner.next(tmp);
|
||||
// Copy EC to heap, then close the scanner.
|
||||
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
|
||||
// See more details in HBASE-26036.
|
||||
for (Cell cell : tmp) {
|
||||
results.add(cell instanceof ByteBufferExtendedCell ?
|
||||
((ByteBufferExtendedCell) cell).deepClone(): cell);
|
||||
}
|
||||
}
|
||||
|
||||
// post-get CP hook
|
||||
|
|
|
@ -396,20 +396,24 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
|
|||
}
|
||||
get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags,
|
||||
VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT));
|
||||
List<Cell> result = ctx.getEnvironment().getRegion().get(get, false);
|
||||
try (RegionScanner scanner = ctx.getEnvironment().getRegion().getScanner(new Scan(get))) {
|
||||
// NOTE: Please don't use HRegion.get() instead,
|
||||
// because it will copy cells to heap. See HBASE-26036
|
||||
List<Cell> result = new ArrayList<>();
|
||||
scanner.next(result);
|
||||
|
||||
if (result.size() < get.getMaxVersions()) {
|
||||
// Nothing to delete
|
||||
PrivateCellUtil.updateLatestStamp(cell, byteNow);
|
||||
return;
|
||||
if (result.size() < get.getMaxVersions()) {
|
||||
// Nothing to delete
|
||||
PrivateCellUtil.updateLatestStamp(cell, byteNow);
|
||||
return;
|
||||
}
|
||||
if (result.size() > get.getMaxVersions()) {
|
||||
throw new RuntimeException("Unexpected size: " + result.size() +
|
||||
". Results more than the max versions obtained.");
|
||||
}
|
||||
Cell getCell = result.get(get.getMaxVersions() - 1);
|
||||
PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());
|
||||
}
|
||||
if (result.size() > get.getMaxVersions()) {
|
||||
throw new RuntimeException("Unexpected size: " + result.size()
|
||||
+ ". Results more than the max versions obtained.");
|
||||
}
|
||||
Cell getCell = result.get(get.getMaxVersions() - 1);
|
||||
PrivateCellUtil.setTimestamp(cell, getCell.getTimestamp());
|
||||
|
||||
// We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would
|
||||
// update with the current timestamp after again doing a get. As the hook as already determined
|
||||
// the needed timestamp we need to bypass here.
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.util.Threads.sleep;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestCheckAndMutateWithByteBuff {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestCheckAndMutateWithByteBuff.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCheckAndMutateWithByteBuff.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private static final byte[] CF = Bytes.toBytes("CF");
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final Configuration conf = TEST_UTIL.getConfiguration();
|
||||
private static Admin admin = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
conf.set(HConstants.REGION_IMPL, TestCheckAndMutateRegion.class.getName());
|
||||
conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
|
||||
DeallocateRewriteByteBuffAllocator.class.getName());
|
||||
conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
|
||||
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 1);
|
||||
conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
|
||||
conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024);
|
||||
conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||
conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
|
||||
conf.setInt("hbase.client.retries.number", 1);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
admin = TEST_UTIL.getAdmin();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckAndMutateWithByteBuff() throws Exception {
|
||||
Table testTable = createTable(TableName.valueOf(name.getMethodName()));
|
||||
byte[] checkRow = Bytes.toBytes("checkRow");
|
||||
byte[] checkQualifier = Bytes.toBytes("cq");
|
||||
byte[] checkValue = Bytes.toBytes("checkValue");
|
||||
|
||||
Put put = new Put(checkRow);
|
||||
put.addColumn(CF, checkQualifier, checkValue);
|
||||
testTable.put(put);
|
||||
admin.flush(testTable.getName());
|
||||
|
||||
assertTrue(testTable.checkAndMutate(checkRow, CF).qualifier(checkQualifier).
|
||||
ifEquals(checkValue)
|
||||
.thenPut(new Put(checkRow).addColumn(CF, Bytes.toBytes("q1"),
|
||||
Bytes.toBytes("testValue"))));
|
||||
}
|
||||
|
||||
private Table createTable(TableName tableName)
|
||||
throws IOException {
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).setBlocksize(100).build())
|
||||
.build();
|
||||
return TEST_UTIL.createTable(td, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* An override of HRegion to allow sleep after get(), waiting for the release of DBB
|
||||
*/
|
||||
public static class TestCheckAndMutateRegion extends HRegion {
|
||||
public TestCheckAndMutateRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam,
|
||||
RegionInfo info, TableDescriptor htd, RegionServerServices rsServices) {
|
||||
super(tableDir, log, fs, confParam, info, htd, rsServices);
|
||||
}
|
||||
|
||||
public TestCheckAndMutateRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
|
||||
TableDescriptor htd, RegionServerServices rsServices) {
|
||||
super(fs, wal, confParam, htd, rsServices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
|
||||
List<Cell> cells = super.get(get, withCoprocessor);
|
||||
sleep(600);
|
||||
return cells;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE;
|
|||
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
|
||||
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -31,7 +30,6 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.AuthUtil;
|
||||
|
@ -49,10 +47,12 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.regionserver.OperationStatus;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
|
||||
|
@ -164,33 +164,7 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer
|
|||
assert (labelsRegion != null || systemCall);
|
||||
List<String> auths = new ArrayList<>();
|
||||
Get get = new Get(user);
|
||||
List<Cell> cells = null;
|
||||
if (labelsRegion == null) {
|
||||
Table table = null;
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = ConnectionFactory.createConnection(conf);
|
||||
table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME);
|
||||
Result result = table.get(get);
|
||||
cells = result.listCells();
|
||||
} finally {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
if (connection != null){
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cells = this.labelsRegion.get(get, false);
|
||||
}
|
||||
if (cells != null) {
|
||||
for (Cell cell : cells) {
|
||||
String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||
cell.getQualifierLength());
|
||||
auths.add(auth);
|
||||
}
|
||||
}
|
||||
getAuths(get, auths);
|
||||
return auths;
|
||||
}
|
||||
|
||||
|
@ -201,36 +175,50 @@ public class ExpAsStringVisibilityLabelServiceImpl implements VisibilityLabelSer
|
|||
if (groups != null && groups.length > 0) {
|
||||
for (String group : groups) {
|
||||
Get get = new Get(Bytes.toBytes(AuthUtil.toGroupEntry(group)));
|
||||
List<Cell> cells = null;
|
||||
if (labelsRegion == null) {
|
||||
Table table = null;
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = ConnectionFactory.createConnection(conf);
|
||||
table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME);
|
||||
Result result = table.get(get);
|
||||
cells = result.listCells();
|
||||
} finally {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cells = this.labelsRegion.get(get, false);
|
||||
}
|
||||
if (cells != null) {
|
||||
for (Cell cell : cells) {
|
||||
String auth = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||
cell.getQualifierLength());
|
||||
auths.add(auth);
|
||||
}
|
||||
}
|
||||
getAuths(get, auths);
|
||||
}
|
||||
}
|
||||
return auths;
|
||||
}
|
||||
|
||||
private void getAuths(Get get, List<String> auths) throws IOException {
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
RegionScanner scanner = null;
|
||||
try {
|
||||
if (labelsRegion == null) {
|
||||
Table table = null;
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = ConnectionFactory.createConnection(conf);
|
||||
table = connection.getTable(VisibilityConstants.LABELS_TABLE_NAME);
|
||||
Result result = table.get(get);
|
||||
cells = result.listCells();
|
||||
} finally {
|
||||
if (table != null) {
|
||||
table.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// NOTE: Please don't use HRegion.get() instead,
|
||||
// because it will copy cells to heap. See HBASE-26036
|
||||
scanner = this.labelsRegion.getScanner(new Scan(get));
|
||||
scanner.next(cells);
|
||||
}
|
||||
for (Cell cell : cells) {
|
||||
String auth = Bytes
|
||||
.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
|
||||
auths.add(auth);
|
||||
}
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> listLabels(String regex) throws IOException {
|
||||
// return an empty list for this implementation.
|
||||
|
|
Loading…
Reference in New Issue