HBASE-16372 References to previous cell in read path should be avoided

(Ram)
This commit is contained in:
Ramkrishna 2016-10-06 14:19:12 +05:30
parent eb33b60a95
commit 58e843dae2
24 changed files with 697 additions and 47 deletions

View File

@ -19,12 +19,17 @@
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@Category({ SmallTests.class, ClientTests.class })
public class TestPut {
@Test
public void testCopyConstructor() {

View File

@ -19,13 +19,17 @@
package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import static org.junit.Assert.*;
@SuppressWarnings("ThrowableInstanceNeverThrown")
@Category({ SmallTests.class, ClientTests.class })
public class TestClientExceptionsUtil {
@Test

View File

@ -97,7 +97,7 @@ public class KeyValueUtil {
}
/**************** copy key only *********************/
/**************** copy the cell to create a new keyvalue *********************/
public static KeyValue copyToNewKeyValue(final Cell cell) {
byte[] bytes = copyToNewByteArray(cell);
@ -118,6 +118,21 @@ public class KeyValueUtil {
return buffer;
}
/**
* Copies the key to a new KeyValue
* @param cell
* @return the KeyValue that consists only the key part of the incoming cell
*/
public static KeyValue toNewKeyCell(final Cell cell) {
byte[] bytes = new byte[keyLength(cell)];
appendKeyTo(cell, bytes, 0);
KeyValue kv = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
// Set the seq id. The new key cell could be used in comparisons so it
// is important that it uses the seqid also. If not the comparsion would fail
kv.setSequenceId(cell.getSequenceId());
return kv;
}
public static byte[] copyToNewByteArray(final Cell cell) {
int v1Length = length(cell);
byte[] backingBytes = new byte[v1Length];

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -29,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.BloomFilterChunk;
@ -60,6 +60,8 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
/** The size of individual Bloom filter chunks to create */
private int chunkByteSize;
/** The prev Cell that was processed */
private Cell prevCell;
/** A Bloom filter chunk enqueued for writing */
private static class ReadyChunk {
@ -159,7 +161,7 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
}
@Override
public void add(Cell cell) {
public void append(Cell cell) throws IOException {
if (cell == null)
throw new NullPointerException();
@ -181,9 +183,22 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
}
chunk.add(cell);
this.prevCell = cell;
++totalKeyCount;
}
@Override
public void beforeShipped() throws IOException {
if (this.prevCell != null) {
this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
}
}
@Override
public Cell getPrevCell() {
return this.prevCell;
}
private void allocateNewChunk() {
if (prevChunk == null) {
// First chunk

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@ -194,15 +196,13 @@ public class HFile {
}
/** API required to write an {@link HFile} */
public interface Writer extends Closeable {
public interface Writer extends Closeable, CellSink, ShipperListener {
/** Max memstore (mvcc) timestamp in FileInfo */
public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
/** Add an element to the file info map. */
void appendFileInfo(byte[] key, byte[] value) throws IOException;
void append(Cell cell) throws IOException;
/** @return the path to this {@link HFile} */
Path getPath();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.CellComparator.MetaCellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.compress.Compression;
@ -701,6 +702,20 @@ public class HFileWriterImpl implements HFile.Writer {
}
}
@Override
public void beforeShipped() throws IOException {
// Add clone methods for every cell
if (this.lastCell != null) {
this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
}
if (this.firstCellInBlock != null) {
this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
}
if (this.lastCellOfPreviousBlock != null) {
this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
}
}
protected void finishFileInfo() throws IOException {
if (lastCell != null) {
// Make a copy. The copy is stuffed into our fileinfo map. Needs a clean

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;

View File

@ -26,13 +26,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
import org.apache.hadoop.hbase.regionserver.CellSink;
/**
* Base class for cell sink that separates the provided cells into multiple files.
*/
@InterfaceAudience.Private
public abstract class AbstractMultiFileWriter implements CellSink {
public abstract class AbstractMultiFileWriter implements CellSink, ShipperListener {
private static final Log LOG = LogFactory.getLog(AbstractMultiFileWriter.class);
@ -116,4 +116,13 @@ public abstract class AbstractMultiFileWriter implements CellSink {
*/
protected void preCloseWriter(StoreFileWriter writer) throws IOException {
}
@Override
public void beforeShipped() throws IOException {
if (this.writers() != null) {
for (StoreFileWriter writer : writers()) {
writer.beforeShipped();
}
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
/**
* A sink of cells that allows appending cells to the Writers that implement it.
* {@link org.apache.hadoop.hbase.io.hfile.HFile.Writer},
* {@link StoreFileWriter}, {@link AbstractMultiFileWriter},
* {@link BloomFilterWriter} are some implementors of this.
*/
@InterfaceAudience.Private
public interface CellSink {
/**
* Append the given cell
* @param cell the cell to be added
* @throws IOException
*/
void append(Cell cell) throws IOException;
}

View File

@ -971,6 +971,8 @@ public class HStore implements Store {
* @param includesTag - includesTag or not
* @return Writer for a new StoreFile in the tmp dir.
*/
// TODO : allow the Writer factory to create Writers of ShipperListener type only in case of
// compaction
@Override
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Implementors of this interface are the ones who needs to do some action when the
* {@link Shipper#shipped()} is called
*/
@InterfaceAudience.Private
public interface ShipperListener {
/**
* The action that needs to be performed before {@link Shipper#shipped()} is performed
* @throws IOException
*/
void beforeShipped() throws IOException;
}

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomContext;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
@ -51,7 +50,7 @@ import com.google.common.base.Preconditions;
* local because it is an implementation detail of the HBase regionserver.
*/
@InterfaceAudience.Private
public class StoreFileWriter implements Compactor.CellSink {
public class StoreFileWriter implements CellSink, ShipperListener {
private static final Log LOG = LogFactory.getLog(StoreFileWriter.class.getName());
private final BloomFilterWriter generalBloomFilterWriter;
@ -120,6 +119,7 @@ public class StoreFileWriter implements Compactor.CellSink {
// it no longer writable.
this.timeRangeTrackerSet = trt != null;
this.timeRangeTracker = this.timeRangeTrackerSet? trt: new TimeRangeTracker();
// TODO : Change all writers to be specifically created for compaction context
writer = HFile.getWriterFactory(conf, cacheConf)
.withPath(fs, path)
.withComparator(comparator)
@ -140,10 +140,10 @@ public class StoreFileWriter implements Compactor.CellSink {
// init bloom context
switch (bloomType) {
case ROW:
bloomContext = new RowBloomContext(generalBloomFilterWriter);
bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator);
break;
case ROWCOL:
bloomContext = new RowColBloomContext(generalBloomFilterWriter);
bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator);
break;
default:
throw new IOException(
@ -160,7 +160,7 @@ public class StoreFileWriter implements Compactor.CellSink {
this.deleteFamilyBloomFilterWriter = BloomFilterFactory
.createDeleteBloomAtWrite(conf, cacheConf,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter);
deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter, comparator);
} else {
deleteFamilyBloomFilterWriter = null;
}
@ -251,6 +251,7 @@ public class StoreFileWriter implements Compactor.CellSink {
}
}
@Override
public void append(final Cell cell) throws IOException {
appendGeneralBloomfilter(cell);
appendDeleteFamilyBloomFilter(cell);
@ -258,6 +259,19 @@ public class StoreFileWriter implements Compactor.CellSink {
trackTimestamps(cell);
}
@Override
public void beforeShipped() throws IOException {
// For now these writer will always be of type ShipperListener true.
// TODO : Change all writers to be specifically created for compaction context
writer.beforeShipped();
if (generalBloomFilterWriter != null) {
generalBloomFilterWriter.beforeShipped();
}
if (deleteFamilyBloomFilterWriter != null) {
deleteFamilyBloomFilterWriter.beforeShipped();
}
}
public Path getPath() {
return this.writer.getPath();
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -111,7 +110,7 @@ abstract class StoreFlusher {
* @param smallestReadPoint Smallest read point used for the flush.
* @param throughputController A controller to avoid flush too fast
*/
protected void performFlush(InternalScanner scanner, Compactor.CellSink sink,
protected void performFlush(InternalScanner scanner, CellSink sink,
long smallestReadPoint, ThroughputController throughputController) throws IOException {
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Scan;
@ -991,6 +992,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public void shipped() throws IOException {
if (prevCell != null) {
// Do the copy here so that in case the prevCell ref is pointing to the previous
// blocks we can safely release those blocks.
// This applies to blocks that are got from Bucket cache, L1 cache and the blocks
// fetched from HDFS. Copying this would ensure that we let go the references to these
// blocks so that they can be GCed safely(in case of bucket cache)
prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
}
matcher.beforeShipped();
for (KeyValueHeap h : this.heapsForDelayedClose) {
h.close();// There wont be further fetch of Cells from these scanners. Just close.
}

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -39,11 +38,13 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
@ -51,7 +52,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -91,10 +91,6 @@ public abstract class Compactor<T extends CellSink> {
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
}
public interface CellSink {
void append(Cell cell) throws IOException;
}
protected interface CellSinkFactory<S> {
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
throws IOException;
@ -391,6 +387,7 @@ public abstract class Compactor<T extends CellSink> {
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
@ -443,6 +440,9 @@ public abstract class Compactor<T extends CellSink> {
}
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener)writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
/**
@ -50,7 +51,7 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchC
* This class is NOT thread-safe as queries are never multi-threaded
*/
@InterfaceAudience.Private
public interface ColumnTracker {
public interface ColumnTracker extends ShipperListener {
/**
* Checks if the column is present in the list of requested columns by returning the match code

View File

@ -248,4 +248,9 @@ public class ExplicitColumnTracker implements ColumnTracker {
public boolean isDone(long timestamp) {
return minVersions <= 0 && isExpired(timestamp);
}
@Override
public void beforeShipped() throws IOException {
// do nothing
}
}

View File

@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
import org.apache.hadoop.hbase.util.Bytes;
@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* A query matcher that is specifically designed for the scan case.
*/
@InterfaceAudience.Private
public abstract class ScanQueryMatcher {
public abstract class ScanQueryMatcher implements ShipperListener {
/**
* {@link #match} return codes. These instruct the scanner moving through memstores and StoreFiles
@ -334,6 +335,16 @@ public abstract class ScanQueryMatcher {
*/
public abstract Cell getNextKeyHint(Cell cell) throws IOException;
@Override
public void beforeShipped() throws IOException {
if (this.currentRow != null) {
this.currentRow = CellUtil.createFirstOnRow(CellUtil.copyRow(this.currentRow));
}
if (columns != null) {
columns.beforeShipped();
}
}
protected static DeleteTracker instantiateDeleteTracker(RegionCoprocessorHost host)
throws IOException {
DeleteTracker tracker = new ScanDeleteTracker();

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@ -193,6 +194,13 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
return MatchCode.SEEK_NEXT_COL;
}
@Override
public void beforeShipped() {
if (columnCell != null) {
this.columnCell = KeyValueUtil.toNewKeyCell(this.columnCell);
}
}
public boolean isDone(long timestamp) {
return minVersions <= 0 && isExpired(timestamp);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.HFile;
@ -30,17 +31,16 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
@InterfaceAudience.Private
public abstract class BloomContext {
// TODO : Avoid holding references to lastCell
protected Cell lastCell;
protected BloomFilterWriter bloomFilterWriter;
protected CellComparator comparator;
public BloomContext(BloomFilterWriter bloomFilterWriter) {
public BloomContext(BloomFilterWriter bloomFilterWriter, CellComparator comparator) {
this.bloomFilterWriter = bloomFilterWriter;
this.comparator = comparator;
}
public Cell getLastCell() {
return this.lastCell;
return this.bloomFilterWriter.getPrevCell();
}
/**
@ -51,8 +51,17 @@ public abstract class BloomContext {
public void writeBloom(Cell cell) throws IOException {
// only add to the bloom filter on a new, unique key
if (isNewKey(cell)) {
bloomFilterWriter.add(cell);
this.lastCell = cell;
sanityCheck(cell);
bloomFilterWriter.append(cell);
}
}
private void sanityCheck(Cell cell) throws IOException {
if (this.getLastCell() != null) {
if (comparator.compare(cell, this.getLastCell()) <= 0) {
throw new IOException("Added a key not lexically larger than" + " previous. Current cell = "
+ cell + ", prevCell = " + this.getLastCell());
}
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.io.Writable;
/**
@ -28,7 +30,7 @@ import org.apache.hadoop.io.Writable;
* resulting Bloom filter as a sequence of bytes.
*/
@InterfaceAudience.Private
public interface BloomFilterWriter extends BloomFilterBase {
public interface BloomFilterWriter extends BloomFilterBase, CellSink, ShipperListener {
/** Compact the Bloom filter before writing metadata &amp; data to disk. */
void compactBloom();
@ -48,8 +50,8 @@ public interface BloomFilterWriter extends BloomFilterBase {
Writable getDataWriter();
/**
* Add the specified binary to the bloom filter.
* @param cell the cell data to be added to the bloom
* Returns the previous cell written by this writer
* @return the previous cell
*/
void add(Cell cell);
Cell getPrevCell();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
@ -31,21 +32,21 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
@InterfaceAudience.Private
public class RowBloomContext extends BloomContext {
public RowBloomContext(BloomFilterWriter bloomFilterWriter) {
super(bloomFilterWriter);
public RowBloomContext(BloomFilterWriter bloomFilterWriter, CellComparator comparator) {
super(bloomFilterWriter, comparator);
}
public void addLastBloomKey(Writer writer) throws IOException {
if (lastCell != null) {
byte[] key = CellUtil.copyRow(this.lastCell);
if (this.getLastCell() != null) {
byte[] key = CellUtil.copyRow(this.getLastCell());
writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, key);
}
}
@Override
protected boolean isNewKey(Cell cell) {
if (this.lastCell != null) {
return !CellUtil.matchingRows(cell, this.lastCell);
if (this.getLastCell() != null) {
return !CellUtil.matchingRows(cell, this.getLastCell());
}
return true;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
@ -32,14 +33,14 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
@InterfaceAudience.Private
public class RowColBloomContext extends BloomContext {
public RowColBloomContext(BloomFilterWriter generalBloomFilterWriter) {
super(generalBloomFilterWriter);
public RowColBloomContext(BloomFilterWriter generalBloomFilterWriter, CellComparator comparator) {
super(generalBloomFilterWriter, comparator);
}
@Override
public void addLastBloomKey(Writer writer) throws IOException {
if (this.lastCell != null) {
Cell firstOnRow = CellUtil.createFirstOnRowCol(this.lastCell);
if (this.getLastCell() != null) {
Cell firstOnRow = CellUtil.createFirstOnRowCol(this.getLastCell());
// This copy happens only once when the writer is closed
byte[] key = CellUtil.getCellKeySerializedAsKeyValueKey(firstOnRow);
writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, key);
@ -48,8 +49,8 @@ public class RowColBloomContext extends BloomContext {
@Override
protected boolean isNewKey(Cell cell) {
if (this.lastCell != null) {
return !CellUtil.matchingRowColumn(cell, this.lastCell);
if (this.getLastCell() != null) {
return !CellUtil.matchingRowColumn(cell, this.getLastCell());
}
return true;
}

View File

@ -0,0 +1,447 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ LargeTests.class, ClientTests.class })
@SuppressWarnings("deprecation")
public class TestAvoidCellReferencesIntoShippedBlocks {
private static final Log LOG = LogFactory.getLog(TestAvoidCellReferencesIntoShippedBlocks.class);
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static byte[][] ROWS = new byte[2][];
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[] ROW1 = Bytes.toBytes("testRow1");
private static byte[] ROW2 = Bytes.toBytes("testRow2");
private static byte[] ROW3 = Bytes.toBytes("testRow3");
private static byte[] ROW4 = Bytes.toBytes("testRow4");
private static byte[] ROW5 = Bytes.toBytes("testRow5");
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[][] FAMILIES_1 = new byte[1][0];
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
private static byte[] data = new byte[1000];
protected static int SLAVES = 1;
private CountDownLatch latch = new CountDownLatch(1);
private static CountDownLatch compactReadLatch = new CountDownLatch(1);
private static AtomicBoolean doScan = new AtomicBoolean(false);
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
ROWS[0] = ROW;
ROWS[1] = ROW1;
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName());
conf.setBoolean("hbase.table.sanity.checks", true); // enable for below
// tests
conf.setInt("hbase.regionserver.handler.count", 20);
conf.setInt("hbase.bucketcache.size", 400);
conf.setStrings("hbase.bucketcache.ioengine", "offheap");
conf.setInt("hbase.hstore.compactionThreshold", 7);
conf.setFloat("hfile.block.cache.size", 0.2f);
conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
FAMILIES_1[0] = FAMILY;
TEST_UTIL.startMiniCluster(SLAVES);
compactReadLatch = new CountDownLatch(1);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testHBase16372InCompactionWritePath() throws Exception {
TableName tableName = TableName.valueOf("testHBase16372InCompactionWritePath");
// Create a table with block size as 1024
final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
CompactorRegionObserver.class.getName());
try {
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
Region region =
TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName);
Store store = region.getStores().iterator().next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
final BlockCache cache = cacheConf.getBlockCache();
// insert data. 5 Rows are added
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW2);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW2);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
put = new Put(ROW3);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW3);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW4);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
put = new Put(ROW4);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW5);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW5);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
// Load cache
Scan s = new Scan();
s.setMaxResultSize(1000);
ResultScanner scanner = table.getScanner(s);
int count = 0;
for (Result result : scanner) {
count++;
}
assertEquals("Count all the rows ", count, 6);
// all the cache is loaded
// trigger a major compaction
ScannerThread scannerThread = new ScannerThread(table, cache);
scannerThread.start();
region.compact(true);
s = new Scan();
s.setMaxResultSize(1000);
scanner = table.getScanner(s);
count = 0;
for (Result result : scanner) {
count++;
}
assertEquals("Count all the rows ", count, 6);
} finally {
table.close();
}
}
private static class ScannerThread extends Thread {
private final Table table;
private final BlockCache cache;
public ScannerThread(Table table, BlockCache cache) {
this.table = table;
this.cache = cache;
}
public void run() {
Scan s = new Scan();
s.setCaching(1);
s.setStartRow(ROW4);
s.setStopRow(ROW5);
try {
while(!doScan.get()) {
try {
// Sleep till you start scan
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
List<BlockCacheKey> cacheList = new ArrayList<BlockCacheKey>();
Iterator<CachedBlock> iterator = cache.iterator();
// evict all the blocks
while (iterator.hasNext()) {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
cacheList.add(cacheKey);
// evict what ever is available
cache.evictBlock(cacheKey);
}
ResultScanner scanner = table.getScanner(s);
for (Result res : scanner) {
}
compactReadLatch.countDown();
} catch (IOException e) {
}
}
}
public static class CompactorRegionObserver extends BaseRegionObserver {
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s) throws IOException {
return createCompactorScanner(store, scanners, scanType, earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException {
return createCompactorScanner(store, scanners, scanType, earliestPutTs);
}
private InternalScanner createCompactorScanner(Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs)
throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
return new CompactorStoreScanner(store, store.getScanInfo(), scan, scanners, scanType,
store.getSmallestReadPoint(), earliestPutTs);
}
}
private static class CompactorStoreScanner extends StoreScanner {
public CompactorStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs);
}
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
boolean next = super.next(outResult, scannerContext);
for (Cell cell : outResult) {
if(CellComparator.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
try {
// hold the compaction
// set doscan to true
doScan.compareAndSet(false, true);
compactReadLatch.await();
} catch (InterruptedException e) {
}
}
}
return next;
}
}
@Test
public void testHBASE16372InReadPath() throws Exception {
TableName tableName = TableName.valueOf("testHBASE16372");
// Create a table with block size as 1024
final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null);
try {
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
Region region =
TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName);
Store store = region.getStores().iterator().next();
CacheConfig cacheConf = store.getCacheConfig();
cacheConf.setCacheDataOnWrite(true);
cacheConf.setEvictOnClose(true);
final BlockCache cache = cacheConf.getBlockCache();
// insert data. 5 Rows are added
Put put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW1);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW2);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW2);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW3);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW3);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW4);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW4);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
put = new Put(ROW5);
put.addColumn(FAMILY, QUALIFIER, data);
table.put(put);
put = new Put(ROW5);
put.addColumn(FAMILY, QUALIFIER1, data);
table.put(put);
// data was in memstore so don't expect any changes
region.flush(true);
// Load cache
Scan s = new Scan();
s.setMaxResultSize(1000);
ResultScanner scanner = table.getScanner(s);
int count = 0;
for (Result result : scanner) {
count++;
}
assertEquals("Count all the rows ", count, 6);
// Scan from cache
s = new Scan();
// Start a scan from row3
s.setCaching(1);
s.setStartRow(ROW1);
// set partial as true so that the scan can send partial columns also
s.setAllowPartialResults(true);
s.setMaxResultSize(1000);
scanner = table.getScanner(s);
Thread evictorThread = new Thread() {
@Override
public void run() {
List<BlockCacheKey> cacheList = new ArrayList<BlockCacheKey>();
Iterator<CachedBlock> iterator = cache.iterator();
// evict all the blocks
while (iterator.hasNext()) {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
cacheList.add(cacheKey);
cache.evictBlock(cacheKey);
}
try {
Thread.sleep(1);
} catch (InterruptedException e1) {
}
iterator = cache.iterator();
int refBlockCount = 0;
while (iterator.hasNext()) {
iterator.next();
refBlockCount++;
}
assertEquals("One block should be there ", refBlockCount, 1);
// Rescan to prepopulate the data
// cache this row.
Scan s1 = new Scan();
// This scan will start from ROW1 and it will populate the cache with a
// row that is lower than ROW3.
s1.setStartRow(ROW3);
s1.setStopRow(ROW5);
s1.setCaching(1);
ResultScanner scanner;
try {
scanner = table.getScanner(s1);
int count = 0;
for (Result result : scanner) {
count++;
}
assertEquals("Count the rows", count, 2);
iterator = cache.iterator();
List<BlockCacheKey> newCacheList = new ArrayList<BlockCacheKey>();
while (iterator.hasNext()) {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
newCacheList.add(cacheKey);
}
int newBlockRefCount = 0;
for (BlockCacheKey key : cacheList) {
if (newCacheList.contains(key)) {
newBlockRefCount++;
}
}
assertEquals("old blocks should still be found ", newBlockRefCount, 6);
latch.countDown();
} catch (IOException e) {
}
}
};
count = 0;
for (Result result : scanner) {
count++;
if (count == 2) {
evictorThread.start();
latch.await();
}
}
assertEquals("Count should give all rows ", count, 10);
} finally {
table.close();
}
}
}