diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index e0b4eb9d83b..53b12620fdc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -79,8 +79,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private int storeOffset = 0; // Used to indicate that the scanner has closed (see HBASE-1107) - // Do not need to be volatile because it's always accessed via synchronized methods - private boolean closing = false; + private volatile boolean closing = false; private final boolean get; private final boolean explicitColumnQuery; private final boolean useRowColBloom; @@ -157,6 +156,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner final List currentScanners = new ArrayList<>(); // flush update lock private final ReentrantLock flushLock = new ReentrantLock(); + // lock for closing. + private final ReentrantLock closeLock = new ReentrantLock(); protected final long readPt; private boolean topChanged = false; @@ -473,31 +474,38 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } private void close(boolean withDelayedScannersClose) { - if (this.closing) { - return; - } - if (withDelayedScannersClose) { - this.closing = true; - } - // For mob compaction, we do not have a store. - if (this.store != null) { - this.store.deleteChangedReaderObserver(this); - } - if (withDelayedScannersClose) { - clearAndClose(scannersForDelayedClose); - clearAndClose(memStoreScannersAfterFlush); - clearAndClose(flushedstoreFileScanners); - if (this.heap != null) { - this.heap.close(); - this.currentScanners.clear(); - this.heap = null; // CLOSED! + closeLock.lock(); + // If the closeLock is acquired then any subsequent updateReaders() + // call is ignored. + try { + if (this.closing) { + return; } - } else { - if (this.heap != null) { - this.scannersForDelayedClose.add(this.heap); - this.currentScanners.clear(); - this.heap = null; + if (withDelayedScannersClose) { + this.closing = true; } + // For mob compaction, we do not have a store. + if (this.store != null) { + this.store.deleteChangedReaderObserver(this); + } + if (withDelayedScannersClose) { + clearAndClose(scannersForDelayedClose); + clearAndClose(memStoreScannersAfterFlush); + clearAndClose(flushedstoreFileScanners); + if (this.heap != null) { + this.heap.close(); + this.currentScanners.clear(); + this.heap = null; // CLOSED! + } + } else { + if (this.heap != null) { + this.scannersForDelayedClose.add(this.heap); + this.currentScanners.clear(); + this.heap = null; + } + } + } finally { + closeLock.unlock(); } } @@ -876,8 +884,25 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { return; } + boolean updateReaders = false; flushLock.lock(); try { + if (!closeLock.tryLock()) { + // The reason for doing this is that when the current store scanner does not retrieve + // any new cells, then the scanner is considered to be done. The heap of this scanner + // is not closed till the shipped() call is completed. Hence in that case if at all + // the partial close (close (false)) has been called before updateReaders(), there is no + // need for the updateReaders() to happen. + LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); + // no lock acquired. + return; + } + // lock acquired + updateReaders = true; + if (this.closing) { + LOG.debug("StoreScanner already closing. There is no need to updateReaders"); + return; + } flushed = true; final boolean isCompaction = false; boolean usePread = get || scanUsePread; @@ -896,6 +921,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } } finally { flushLock.unlock(); + if (updateReaders) { + closeLock.unlock(); + } } // Let the next() call handle re-creating and seeking } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index d5775da83a2..8a3f254d96b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -34,12 +34,14 @@ import java.util.NavigableSet; import java.util.OptionalInt; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; @@ -74,6 +76,7 @@ public class TestStoreScanner { private static final String CF_STR = "cf"; private static final byte[] CF = Bytes.toBytes(CF_STR); static Configuration CONF = HBaseConfiguration.create(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); @@ -847,7 +850,6 @@ public class TestStoreScanner { } } - @Test @Ignore("this fails, since we don't handle deletions, etc, in peek") public void testPeek() throws Exception { KeyValue[] kvs = new KeyValue [] { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java new file mode 100644 index 00000000000..1fbef261b7e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.KeyValueTestUtil.create; +import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * This test tests whether parallel {@link StoreScanner#close()} and + * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring + * that there are no references on the existing Storescanner readers. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestStoreScannerClosure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStoreScannerClosure.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestStoreScannerClosure.class); + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; + @Rule + public TestName name = new TestName(); + private static final String CF_STR = "cf"; + private static HRegion region; + private static final byte[] CF = Bytes.toBytes(CF_STR); + static Configuration CONF = HBaseConfiguration.create(); + private static CacheConfig cacheConf; + private static FileSystem fs; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static String ROOT_DIR = TEST_UTIL.getDataTestDir("TestHFile").toString(); + private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); + private final static byte[] fam = Bytes.toBytes("cf_1"); + private static final KeyValue[] kvs = + new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), + create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; + + @BeforeClass + public static void setUp() throws Exception { + CONF = TEST_UTIL.getConfiguration(); + cacheConf = new CacheConfig(CONF); + fs = TEST_UTIL.getTestFileSystem(); + TableName tableName = TableName.valueOf("test"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(fam)); + HRegionInfo info = new HRegionInfo(tableName, null, null, false); + Path path = TEST_UTIL.getDataTestDir("test"); + region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); + } + + @Test + public void testScannerCloseAndUpdateReaders1() throws Exception { + testScannerCloseAndUpdateReaderInternal(true, false); + } + + @Test + public void testScannerCloseAndUpdateReaders2() throws Exception { + testScannerCloseAndUpdateReaderInternal(false, true); + } + + private Path writeStoreFile() throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); + HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); + StoreFileWriter sfw = new StoreFileWriter.Builder(CONF, fs).withOutputDir(storeFileParentDir) + .withComparator(CellComparatorImpl.COMPARATOR).withFileContext(meta).build(); + + final int rowLen = 32; + Random RNG = new Random(); + for (int i = 0; i < 1000; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i); + byte[] v = RandomKeyValueUtil.randomValue(RNG); + int cfLen = RNG.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length); + sfw.append(kv); + } + + sfw.close(); + return sfw.getPath(); + } + + private static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + + private HStoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception { + // Open the file reader with block cache disabled. + HStoreFile file = new HStoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE, true); + return file; + } + + private void testScannerCloseAndUpdateReaderInternal(boolean awaitUpdate, boolean awaitClose) + throws IOException, InterruptedException { + // start write to store file. + Path path = writeStoreFile(); + HStoreFile file = null; + List files = new ArrayList(); + try { + file = readStoreFile(path, CONF); + files.add(file); + } catch (Exception e) { + // fail test + assertTrue(false); + } + scanFixture(kvs); + // scanners.add(storeFileScanner); + try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, + new Scan(), getCols("a", "d"), 100L)) { + Thread closeThread = new Thread() { + public void run() { + scan.close(awaitClose, true); + } + }; + closeThread.start(); + Thread updateThread = new Thread() { + public void run() { + try { + scan.updateReaders(awaitUpdate, files, Collections.emptyList()); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + updateThread.start(); + // complete both the threads + closeThread.join(); + // complete both the threads + updateThread.join(); + if (file.getReader() != null) { + // the fileReader is not null when the updateReaders has completed first. + // in the other case the fileReader will be null. + int refCount = file.getReader().getRefCount(); + LOG.info("the store scanner count is " + refCount); + assertTrue("The store scanner count should be 0", refCount == 0); + } + } + } + + private static class ExtendedStoreScanner extends StoreScanner { + private CountDownLatch latch = new CountDownLatch(1); + + public ExtendedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, + NavigableSet columns, long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + public void updateReaders(boolean await, List sfs, + List memStoreScanners) throws IOException { + if (await) { + try { + latch.await(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + super.updateReaders(sfs, memStoreScanners); + if (!await) { + latch.countDown(); + } + } + + // creating a dummy close + public void close(boolean await, boolean dummy) { + if (await) { + try { + latch.await(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + super.close(); + if (!await) { + latch.countDown(); + } + } + } + + NavigableSet getCols(String... strCols) { + NavigableSet cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (String col : strCols) { + byte[] bytes = Bytes.toBytes(col); + cols.add(bytes); + } + return cols; + } +}