diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java index 8603fe1f229..452f40f2060 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; +import org.junit.experimental.categories.Category; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +@Category({ SmallTests.class, ClientTests.class }) public class TestPut { @Test public void testCopyConstructor() { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java index 97e9574de9d..9c01cd9a9a6 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java @@ -19,13 +19,17 @@ package org.apache.hadoop.hbase.exceptions; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; +import org.junit.experimental.categories.Category; import java.io.IOException; import static org.junit.Assert.*; @SuppressWarnings("ThrowableInstanceNeverThrown") +@Category({ SmallTests.class, ClientTests.class }) public class TestClientExceptionsUtil { @Test diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index b723f58d51e..7b9bcb132df 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -97,7 +97,7 @@ public class KeyValueUtil { } - /**************** copy key only *********************/ + /**************** copy the cell to create a new keyvalue *********************/ public static KeyValue copyToNewKeyValue(final Cell cell) { byte[] bytes = copyToNewByteArray(cell); @@ -118,6 +118,21 @@ public class KeyValueUtil { return buffer; } + /** + * Copies the key to a new KeyValue + * @param cell + * @return the KeyValue that consists only the key part of the incoming cell + */ + public static KeyValue toNewKeyCell(final Cell cell) { + byte[] bytes = new byte[keyLength(cell)]; + appendKeyTo(cell, bytes, 0); + KeyValue kv = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length); + // Set the seq id. The new key cell could be used in comparisons so it + // is important that it uses the seqid also. If not the comparsion would fail + kv.setSequenceId(cell.getSequenceId()); + return kv; + } + public static byte[] copyToNewByteArray(final Cell cell) { int v1Length = length(cell); byte[] backingBytes = new byte[v1Length]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java index 3193a175935..a50566ac476 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.BloomFilterChunk; @@ -60,6 +60,8 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase /** The size of individual Bloom filter chunks to create */ private int chunkByteSize; + /** The prev Cell that was processed */ + private Cell prevCell; /** A Bloom filter chunk enqueued for writing */ private static class ReadyChunk { @@ -159,7 +161,7 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase } @Override - public void add(Cell cell) { + public void append(Cell cell) throws IOException { if (cell == null) throw new NullPointerException(); @@ -181,9 +183,22 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase } chunk.add(cell); + this.prevCell = cell; ++totalKeyCount; } + @Override + public void beforeShipped() throws IOException { + if (this.prevCell != null) { + this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); + } + } + + @Override + public Cell getPrevCell() { + return this.prevCell; + } + private void allocateNewChunk() { if (prevChunk == null) { // First chunk diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index d3669f4e52d..0b6ceef156b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.regionserver.CellSink; +import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -194,15 +196,13 @@ public class HFile { } /** API required to write an {@link HFile} */ - public interface Writer extends Closeable { + public interface Writer extends Closeable, CellSink, ShipperListener { /** Max memstore (mvcc) timestamp in FileInfo */ public static final byte [] MAX_MEMSTORE_TS_KEY = Bytes.toBytes("MAX_MEMSTORE_TS_KEY"); /** Add an element to the file info map. */ void appendFileInfo(byte[] key, byte[] value) throws IOException; - void append(Cell cell) throws IOException; - /** @return the path to this {@link HFile} */ Path getPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index c57ecf746ce..c27bf0befa5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.CellComparator.MetaCellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.compress.Compression; @@ -701,6 +702,20 @@ public class HFileWriterImpl implements HFile.Writer { } } + @Override + public void beforeShipped() throws IOException { + // Add clone methods for every cell + if (this.lastCell != null) { + this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); + } + if (this.firstCellInBlock != null) { + this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); + } + if (this.lastCellOfPreviousBlock != null) { + this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); + } + } + protected void finishFileInfo() throws IOException { if (lastCell != null) { // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 55aea003884..d75e4481671 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index a4e02859412..073562994d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -26,13 +26,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink; +import org.apache.hadoop.hbase.regionserver.CellSink; /** * Base class for cell sink that separates the provided cells into multiple files. */ @InterfaceAudience.Private -public abstract class AbstractMultiFileWriter implements CellSink { +public abstract class AbstractMultiFileWriter implements CellSink, ShipperListener { private static final Log LOG = LogFactory.getLog(AbstractMultiFileWriter.class); @@ -116,4 +116,13 @@ public abstract class AbstractMultiFileWriter implements CellSink { */ protected void preCloseWriter(StoreFileWriter writer) throws IOException { } + + @Override + public void beforeShipped() throws IOException { + if (this.writers() != null) { + for (StoreFileWriter writer : writers()) { + writer.beforeShipped(); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java new file mode 100644 index 00000000000..65b119fe91e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.BloomFilterWriter; + +/** + * A sink of cells that allows appending cells to the Writers that implement it. + * {@link org.apache.hadoop.hbase.io.hfile.HFile.Writer}, + * {@link StoreFileWriter}, {@link AbstractMultiFileWriter}, + * {@link BloomFilterWriter} are some implementors of this. + */ +@InterfaceAudience.Private +public interface CellSink { + /** + * Append the given cell + * @param cell the cell to be added + * @throws IOException + */ + void append(Cell cell) throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7a419e4395f..ce5c91dc856 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -971,6 +971,8 @@ public class HStore implements Store { * @param includesTag - includesTag or not * @return Writer for a new StoreFile in the tmp dir. */ + // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of + // compaction @Override public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java new file mode 100644 index 00000000000..657bae0e2c2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Implementors of this interface are the ones who needs to do some action when the + * {@link Shipper#shipped()} is called + */ +@InterfaceAudience.Private +public interface ShipperListener { + + /** + * The action that needs to be performed before {@link Shipper#shipped()} is performed + * @throws IOException + */ + void beforeShipped() throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index cb5d12cafeb..bd1d62e9313 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.util.BloomContext; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -51,7 +50,7 @@ import com.google.common.base.Preconditions; * local because it is an implementation detail of the HBase regionserver. */ @InterfaceAudience.Private -public class StoreFileWriter implements Compactor.CellSink { +public class StoreFileWriter implements CellSink, ShipperListener { private static final Log LOG = LogFactory.getLog(StoreFileWriter.class.getName()); private final BloomFilterWriter generalBloomFilterWriter; @@ -120,6 +119,7 @@ public class StoreFileWriter implements Compactor.CellSink { // it no longer writable. this.timeRangeTrackerSet = trt != null; this.timeRangeTracker = this.timeRangeTrackerSet? trt: new TimeRangeTracker(); + // TODO : Change all writers to be specifically created for compaction context writer = HFile.getWriterFactory(conf, cacheConf) .withPath(fs, path) .withComparator(comparator) @@ -140,10 +140,10 @@ public class StoreFileWriter implements Compactor.CellSink { // init bloom context switch (bloomType) { case ROW: - bloomContext = new RowBloomContext(generalBloomFilterWriter); + bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator); break; case ROWCOL: - bloomContext = new RowColBloomContext(generalBloomFilterWriter); + bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator); break; default: throw new IOException( @@ -160,7 +160,7 @@ public class StoreFileWriter implements Compactor.CellSink { this.deleteFamilyBloomFilterWriter = BloomFilterFactory .createDeleteBloomAtWrite(conf, cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); - deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter); + deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter, comparator); } else { deleteFamilyBloomFilterWriter = null; } @@ -251,6 +251,7 @@ public class StoreFileWriter implements Compactor.CellSink { } } + @Override public void append(final Cell cell) throws IOException { appendGeneralBloomfilter(cell); appendDeleteFamilyBloomFilter(cell); @@ -258,6 +259,19 @@ public class StoreFileWriter implements Compactor.CellSink { trackTimestamps(cell); } + @Override + public void beforeShipped() throws IOException { + // For now these writer will always be of type ShipperListener true. + // TODO : Change all writers to be specifically created for compaction context + writer.beforeShipped(); + if (generalBloomFilterWriter != null) { + generalBloomFilterWriter.beforeShipped(); + } + if (deleteFamilyBloomFilterWriter != null) { + deleteFamilyBloomFilterWriter.beforeShipped(); + } + } + public Path getPath() { return this.writer.getPath(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 5ba7d335410..056cdcccc2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -111,7 +110,7 @@ abstract class StoreFlusher { * @param smallestReadPoint Smallest read point used for the flush. * @param throughputController A controller to avoid flush too fast */ - protected void performFlush(InternalScanner scanner, Compactor.CellSink sink, + protected void performFlush(InternalScanner scanner, CellSink sink, long smallestReadPoint, ThroughputController throughputController) throws IOException { int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e008a402dcf..cb40909db05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; @@ -991,6 +992,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public void shipped() throws IOException { + if (prevCell != null) { + // Do the copy here so that in case the prevCell ref is pointing to the previous + // blocks we can safely release those blocks. + // This applies to blocks that are got from Bucket cache, L1 cache and the blocks + // fetched from HDFS. Copying this would ensure that we let go the references to these + // blocks so that they can be GCed safely(in case of bucket cache) + prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); + } + matcher.beforeShipped(); for (KeyValueHeap h : this.heapsForDelayedClose) { h.close();// There wont be further fetch of Cells from these scanners. Just close. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index c695788934b..f4bd9a81ac5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.io.InterruptedIOException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,11 +38,13 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileReader; @@ -51,7 +52,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; -import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; @@ -91,10 +91,6 @@ public abstract class Compactor { HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); } - public interface CellSink { - void append(Cell cell) throws IOException; - } - protected interface CellSinkFactory { S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) throws IOException; @@ -391,6 +387,7 @@ public abstract class Compactor { protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { + assert writer instanceof ShipperListener; long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -443,6 +440,9 @@ public abstract class Compactor { } } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + // Clone the cells that are in the writer so that they are freed of references, + // if they are holding any. + ((ShipperListener)writer).beforeShipped(); // The SHARED block references, being read for compaction, will be kept in prevBlocks // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells // being returned to client, we will call shipped() which can clear this list. Here by diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java index 17c6afe0d89..7a2a1e28434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; /** @@ -50,7 +51,7 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchC * This class is NOT thread-safe as queries are never multi-threaded */ @InterfaceAudience.Private -public interface ColumnTracker { +public interface ColumnTracker extends ShipperListener { /** * Checks if the column is present in the list of requested columns by returning the match code diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java index da65c78ba8a..b4825f09f8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java @@ -248,4 +248,9 @@ public class ExplicitColumnTracker implements ColumnTracker { public boolean isDone(long timestamp) { return minVersions <= 0 && isExpired(timestamp); } + + @Override + public void beforeShipped() throws IOException { + // do nothing + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index b5469d39bf5..82aae6c2aad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult; import org.apache.hadoop.hbase.util.Bytes; @@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes; * A query matcher that is specifically designed for the scan case. */ @InterfaceAudience.Private -public abstract class ScanQueryMatcher { +public abstract class ScanQueryMatcher implements ShipperListener { /** * {@link #match} return codes. These instruct the scanner moving through memstores and StoreFiles @@ -334,6 +335,16 @@ public abstract class ScanQueryMatcher { */ public abstract Cell getNextKeyHint(Cell cell) throws IOException; + @Override + public void beforeShipped() throws IOException { + if (this.currentRow != null) { + this.currentRow = CellUtil.createFirstOnRow(CellUtil.copyRow(this.currentRow)); + } + if (columns != null) { + columns.beforeShipped(); + } + } + protected static DeleteTracker instantiateDeleteTracker(RegionCoprocessorHost host) throws IOException { DeleteTracker tracker = new ScanDeleteTracker(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java index e3994b68883..a73cc0b8be4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode; import org.apache.hadoop.hbase.util.Bytes; @@ -193,6 +194,13 @@ public class ScanWildcardColumnTracker implements ColumnTracker { return MatchCode.SEEK_NEXT_COL; } + @Override + public void beforeShipped() { + if (columnCell != null) { + this.columnCell = KeyValueUtil.toNewKeyCell(this.columnCell); + } + } + public boolean isDone(long timestamp) { return minVersions <= 0 && isExpired(timestamp); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java index fc40aafe3d3..8a1c6cf8c97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFile; @@ -30,17 +31,16 @@ import org.apache.hadoop.hbase.io.hfile.HFile; @InterfaceAudience.Private public abstract class BloomContext { - // TODO : Avoid holding references to lastCell - protected Cell lastCell; - protected BloomFilterWriter bloomFilterWriter; + protected CellComparator comparator; - public BloomContext(BloomFilterWriter bloomFilterWriter) { + public BloomContext(BloomFilterWriter bloomFilterWriter, CellComparator comparator) { this.bloomFilterWriter = bloomFilterWriter; + this.comparator = comparator; } public Cell getLastCell() { - return this.lastCell; + return this.bloomFilterWriter.getPrevCell(); } /** @@ -51,8 +51,17 @@ public abstract class BloomContext { public void writeBloom(Cell cell) throws IOException { // only add to the bloom filter on a new, unique key if (isNewKey(cell)) { - bloomFilterWriter.add(cell); - this.lastCell = cell; + sanityCheck(cell); + bloomFilterWriter.append(cell); + } + } + + private void sanityCheck(Cell cell) throws IOException { + if (this.getLastCell() != null) { + if (comparator.compare(cell, this.getLastCell()) <= 0) { + throw new IOException("Added a key not lexically larger than" + " previous. Current cell = " + + cell + ", prevCell = " + this.getLastCell()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java index 32a9ff4bbef..82d6d16eb51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.util; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.CellSink; +import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.io.Writable; /** @@ -28,7 +30,7 @@ import org.apache.hadoop.io.Writable; * resulting Bloom filter as a sequence of bytes. */ @InterfaceAudience.Private -public interface BloomFilterWriter extends BloomFilterBase { +public interface BloomFilterWriter extends BloomFilterBase, CellSink, ShipperListener { /** Compact the Bloom filter before writing metadata & data to disk. */ void compactBloom(); @@ -48,8 +50,8 @@ public interface BloomFilterWriter extends BloomFilterBase { Writable getDataWriter(); /** - * Add the specified binary to the bloom filter. - * @param cell the cell data to be added to the bloom + * Returns the previous cell written by this writer + * @return the previous cell */ - void add(Cell cell); + Cell getPrevCell(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java index f6e36d4de41..592f177479f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; @@ -31,21 +32,21 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; @InterfaceAudience.Private public class RowBloomContext extends BloomContext { - public RowBloomContext(BloomFilterWriter bloomFilterWriter) { - super(bloomFilterWriter); + public RowBloomContext(BloomFilterWriter bloomFilterWriter, CellComparator comparator) { + super(bloomFilterWriter, comparator); } public void addLastBloomKey(Writer writer) throws IOException { - if (lastCell != null) { - byte[] key = CellUtil.copyRow(this.lastCell); + if (this.getLastCell() != null) { + byte[] key = CellUtil.copyRow(this.getLastCell()); writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, key); } } @Override protected boolean isNewKey(Cell cell) { - if (this.lastCell != null) { - return !CellUtil.matchingRows(cell, this.lastCell); + if (this.getLastCell() != null) { + return !CellUtil.matchingRows(cell, this.getLastCell()); } return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java index c1b47af8012..eb0f72140d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; @@ -32,14 +33,14 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; @InterfaceAudience.Private public class RowColBloomContext extends BloomContext { - public RowColBloomContext(BloomFilterWriter generalBloomFilterWriter) { - super(generalBloomFilterWriter); + public RowColBloomContext(BloomFilterWriter generalBloomFilterWriter, CellComparator comparator) { + super(generalBloomFilterWriter, comparator); } @Override public void addLastBloomKey(Writer writer) throws IOException { - if (this.lastCell != null) { - Cell firstOnRow = CellUtil.createFirstOnRowCol(this.lastCell); + if (this.getLastCell() != null) { + Cell firstOnRow = CellUtil.createFirstOnRowCol(this.getLastCell()); // This copy happens only once when the writer is closed byte[] key = CellUtil.getCellKeySerializedAsKeyValueKey(firstOnRow); writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, key); @@ -48,8 +49,8 @@ public class RowColBloomContext extends BloomContext { @Override protected boolean isNewKey(Cell cell) { - if (this.lastCell != null) { - return !CellUtil.matchingRowColumn(cell, this.lastCell); + if (this.getLastCell() != null) { + return !CellUtil.matchingRowColumn(cell, this.getLastCell()); } return true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java new file mode 100644 index 00000000000..cca0eb7dad2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class, ClientTests.class }) +@SuppressWarnings("deprecation") +public class TestAvoidCellReferencesIntoShippedBlocks { + private static final Log LOG = LogFactory.getLog(TestAvoidCellReferencesIntoShippedBlocks.class); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static byte[][] ROWS = new byte[2][]; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[] ROW1 = Bytes.toBytes("testRow1"); + private static byte[] ROW2 = Bytes.toBytes("testRow2"); + private static byte[] ROW3 = Bytes.toBytes("testRow3"); + private static byte[] ROW4 = Bytes.toBytes("testRow4"); + private static byte[] ROW5 = Bytes.toBytes("testRow5"); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES_1 = new byte[1][0]; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1"); + private static byte[] data = new byte[1000]; + protected static int SLAVES = 1; + private CountDownLatch latch = new CountDownLatch(1); + private static CountDownLatch compactReadLatch = new CountDownLatch(1); + private static AtomicBoolean doScan = new AtomicBoolean(false); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ROWS[0] = ROW; + ROWS[1] = ROW1; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below + // tests + conf.setInt("hbase.regionserver.handler.count", 20); + conf.setInt("hbase.bucketcache.size", 400); + conf.setStrings("hbase.bucketcache.ioengine", "offheap"); + conf.setInt("hbase.hstore.compactionThreshold", 7); + conf.setFloat("hfile.block.cache.size", 0.2f); + conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000); + FAMILIES_1[0] = FAMILY; + TEST_UTIL.startMiniCluster(SLAVES); + compactReadLatch = new CountDownLatch(1); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testHBase16372InCompactionWritePath() throws Exception { + TableName tableName = TableName.valueOf("testHBase16372InCompactionWritePath"); + // Create a table with block size as 1024 + final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, + CompactorRegionObserver.class.getName()); + try { + // get the block cache and region + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = + TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + final BlockCache cache = cacheConf.getBlockCache(); + // insert data. 5 Rows are added + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW1); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + // data was in memstore so don't expect any changes + region.flush(true); + put = new Put(ROW1); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW2); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW2); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + // data was in memstore so don't expect any changes + region.flush(true); + put = new Put(ROW3); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW3); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW4); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + // data was in memstore so don't expect any changes + region.flush(true); + put = new Put(ROW4); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW5); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW5); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + // data was in memstore so don't expect any changes + region.flush(true); + // Load cache + Scan s = new Scan(); + s.setMaxResultSize(1000); + ResultScanner scanner = table.getScanner(s); + int count = 0; + for (Result result : scanner) { + count++; + } + assertEquals("Count all the rows ", count, 6); + // all the cache is loaded + // trigger a major compaction + ScannerThread scannerThread = new ScannerThread(table, cache); + scannerThread.start(); + region.compact(true); + s = new Scan(); + s.setMaxResultSize(1000); + scanner = table.getScanner(s); + count = 0; + for (Result result : scanner) { + count++; + } + assertEquals("Count all the rows ", count, 6); + } finally { + table.close(); + } + } + + private static class ScannerThread extends Thread { + private final Table table; + private final BlockCache cache; + + public ScannerThread(Table table, BlockCache cache) { + this.table = table; + this.cache = cache; + } + + public void run() { + Scan s = new Scan(); + s.setCaching(1); + s.setStartRow(ROW4); + s.setStopRow(ROW5); + try { + while(!doScan.get()) { + try { + // Sleep till you start scan + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + List cacheList = new ArrayList(); + Iterator iterator = cache.iterator(); + // evict all the blocks + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + cacheList.add(cacheKey); + // evict what ever is available + cache.evictBlock(cacheKey); + } + ResultScanner scanner = table.getScanner(s); + for (Result res : scanner) { + + } + compactReadLatch.countDown(); + } catch (IOException e) { + } + } + } + + public static class CompactorRegionObserver extends BaseRegionObserver { + @Override + public InternalScanner preCompactScannerOpen(ObserverContext c, + Store store, List scanners, ScanType scanType, + long earliestPutTs, InternalScanner s) throws IOException { + return createCompactorScanner(store, scanners, scanType, earliestPutTs); + } + + @Override + public InternalScanner preCompactScannerOpen(ObserverContext c, + Store store, List scanners, ScanType scanType, + long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { + return createCompactorScanner(store, scanners, scanType, earliestPutTs); + } + + private InternalScanner createCompactorScanner(Store store, + List scanners, ScanType scanType, long earliestPutTs) + throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(store.getFamily().getMaxVersions()); + return new CompactorStoreScanner(store, store.getScanInfo(), scan, scanners, scanType, + store.getSmallestReadPoint(), earliestPutTs); + } + } + + private static class CompactorStoreScanner extends StoreScanner { + + public CompactorStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { + super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs); + } + + @Override + public boolean next(List outResult, ScannerContext scannerContext) throws IOException { + boolean next = super.next(outResult, scannerContext); + for (Cell cell : outResult) { + if(CellComparator.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { + try { + // hold the compaction + // set doscan to true + doScan.compareAndSet(false, true); + compactReadLatch.await(); + } catch (InterruptedException e) { + } + } + } + return next; + } + } + + @Test + public void testHBASE16372InReadPath() throws Exception { + TableName tableName = TableName.valueOf("testHBASE16372"); + // Create a table with block size as 1024 + final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, null); + try { + // get the block cache and region + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); + String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + Region region = + TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName); + Store store = region.getStores().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + final BlockCache cache = cacheConf.getBlockCache(); + // insert data. 5 Rows are added + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW1); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW1); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW2); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW2); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW3); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW3); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW4); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW4); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + put = new Put(ROW5); + put.addColumn(FAMILY, QUALIFIER, data); + table.put(put); + put = new Put(ROW5); + put.addColumn(FAMILY, QUALIFIER1, data); + table.put(put); + // data was in memstore so don't expect any changes + region.flush(true); + // Load cache + Scan s = new Scan(); + s.setMaxResultSize(1000); + ResultScanner scanner = table.getScanner(s); + int count = 0; + for (Result result : scanner) { + count++; + } + assertEquals("Count all the rows ", count, 6); + + // Scan from cache + s = new Scan(); + // Start a scan from row3 + s.setCaching(1); + s.setStartRow(ROW1); + // set partial as true so that the scan can send partial columns also + s.setAllowPartialResults(true); + s.setMaxResultSize(1000); + scanner = table.getScanner(s); + Thread evictorThread = new Thread() { + @Override + public void run() { + List cacheList = new ArrayList(); + Iterator iterator = cache.iterator(); + // evict all the blocks + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + cacheList.add(cacheKey); + cache.evictBlock(cacheKey); + } + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + } + iterator = cache.iterator(); + int refBlockCount = 0; + while (iterator.hasNext()) { + iterator.next(); + refBlockCount++; + } + assertEquals("One block should be there ", refBlockCount, 1); + // Rescan to prepopulate the data + // cache this row. + Scan s1 = new Scan(); + // This scan will start from ROW1 and it will populate the cache with a + // row that is lower than ROW3. + s1.setStartRow(ROW3); + s1.setStopRow(ROW5); + s1.setCaching(1); + ResultScanner scanner; + try { + scanner = table.getScanner(s1); + int count = 0; + for (Result result : scanner) { + count++; + } + assertEquals("Count the rows", count, 2); + iterator = cache.iterator(); + List newCacheList = new ArrayList(); + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + newCacheList.add(cacheKey); + } + int newBlockRefCount = 0; + for (BlockCacheKey key : cacheList) { + if (newCacheList.contains(key)) { + newBlockRefCount++; + } + } + + assertEquals("old blocks should still be found ", newBlockRefCount, 6); + latch.countDown(); + + } catch (IOException e) { + } + } + }; + count = 0; + for (Result result : scanner) { + count++; + if (count == 2) { + evictorThread.start(); + latch.await(); + } + } + assertEquals("Count should give all rows ", count, 10); + } finally { + table.close(); + } + } +}