diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomKeyValueUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomKeyValueUtil.java new file mode 100644 index 00000000000..dcd604503ca --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomKeyValueUtil.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import java.util.Random; + +import org.apache.hadoop.hbase.KeyValue; + +/** + * These helper methods generate random byte[]'s data for KeyValues + */ +public class RandomKeyValueUtil { + public static final String COLUMN_FAMILY_NAME = "_-myColumnFamily-_"; + private static final int MIN_ROW_OR_QUALIFIER_LENGTH = 64; + private static final int MAX_ROW_OR_QUALIFIER_LENGTH = 128; + + private RandomKeyValueUtil() { } + + public static final char randomReadableChar(Random rand) { + int i = rand.nextInt(26 * 2 + 10 + 1); + if (i < 26) { + return (char) ('A' + i); + } + i -= 26; + if (i < 26) { + return (char) ('a' + i); + } + i -= 26; + if (i < 10) { + return (char) ('0' + i); + } + i -= 10; + assert i == 0; + return '_'; + } + + public static KeyValue randomKeyValue(Random rand) { + return new KeyValue(randomRowOrQualifier(rand), + COLUMN_FAMILY_NAME.getBytes(), randomRowOrQualifier(rand), + randomValue(rand)); + } + + public static byte[] randomRowOrQualifier(Random rand) { + StringBuilder field = new StringBuilder(); + int fieldLen = MIN_ROW_OR_QUALIFIER_LENGTH + + rand.nextInt(MAX_ROW_OR_QUALIFIER_LENGTH + - MIN_ROW_OR_QUALIFIER_LENGTH + 1); + for (int i = 0; i < fieldLen; ++i) { + field.append(randomReadableChar(rand)); + } + return field.toString().getBytes(); + } + + public static byte[] randomValue(Random rand) { + StringBuilder v = new StringBuilder(); + for (int j = 0; j < 1 + rand.nextInt(2000); ++j) { + v.append((char) (32 + rand.nextInt(95))); + } + byte[] valueBytes = v.toString().getBytes(); + return valueBytes; + } + + /** + * Generates a random key that is guaranteed to increase as the given index i + * increases. The result consists of a prefix, which is a deterministic + * increasing function of i, and a random suffix. + */ + public static byte[] randomOrderedKey(Random rand, int i) { + StringBuilder k = new StringBuilder(); + + // The fixed-length lexicographically increasing part of the key. + for (int bitIndex = 31; bitIndex >= 0; --bitIndex) { + if ((i & (1 << bitIndex)) == 0) { + k.append("a"); + } else { + k.append("b"); + } + } + + // A random-length random suffix of the key. + for (int j = 0; j < rand.nextInt(50); ++j) { + k.append(randomReadableChar(rand)); + } + + byte[] keyBytes = k.toString().getBytes(); + return keyBytes; + } + + public static byte[] randomOrderedFixedLengthKey(Random rand, int i, int suffixLength) { + StringBuilder k = new StringBuilder(); + + // The fixed-length lexicographically increasing part of the key. + for (int bitIndex = 31; bitIndex >= 0; --bitIndex) { + if ((i & (1 << bitIndex)) == 0) { + k.append("a"); + } else { + k.append("b"); + } + } + + // A random suffix of the key. + for (int j = 0; j < suffixLength; ++j) { + k.append(randomReadableChar(rand)); + } + + byte[] keyBytes = k.toString().getBytes(); + return keyBytes; + } + + public static byte[] randomFixedLengthValue(Random rand, int valueLength) { + StringBuilder v = new StringBuilder(); + for (int j = 0; j < valueLength; ++j) { + v.append((char) (32 + rand.nextInt(95))); + } + + byte[] valueBytes = v.toString().getBytes(); + return valueBytes; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java new file mode 100644 index 00000000000..710a6dc9bdb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.KeyValueTestUtil.create; +import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * This test tests whether parallel {@link StoreScanner#close()} and + * {@link StoreScanner#updateReaders(List, List)} works perfectly ensuring + * that there are no references on the existing Storescanner readers. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestStoreScannerClosure { + + private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; + @Rule + public TestName name = new TestName(); + private static final String CF_STR = "cf"; + private static HRegion region; + private static final byte[] CF = Bytes.toBytes(CF_STR); + static Configuration CONF = HBaseConfiguration.create(); + private static CacheConfig cacheConf; + private static FileSystem fs; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, + Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR); + private final static byte[] fam = Bytes.toBytes("cf_1"); + private static final KeyValue[] kvs = + new KeyValue[] { create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"), + create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"), + create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"), }; + + @BeforeClass + public static void setUp() throws Exception { + CONF = TEST_UTIL.getConfiguration(); + cacheConf = new CacheConfig(CONF); + fs = TEST_UTIL.getTestFileSystem(); + TableName tableName = TableName.valueOf("test"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(fam)); + HRegionInfo info = new HRegionInfo(tableName, null, null, false); + Path path = TEST_UTIL.getDataTestDir("test"); + region = HBaseTestingUtility.createRegionAndWAL(info, path, path, + TEST_UTIL.getConfiguration(), htd); + } + + @Test + public void testScannerCloseAndUpdateReaders1() throws Exception { + testScannerCloseAndUpdateReaderInternal(true, false); + } + + @Test + public void testScannerCloseAndUpdateReaders2() throws Exception { + testScannerCloseAndUpdateReaderInternal(false, true); + } + + private Path writeStoreFile() throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), "TestHFile"); + Path storeFilePath = new Path(storeFileParentDir, + UUID.randomUUID().toString().replaceAll("-", "")); + HFileContext meta = new HFileContextBuilder().withBlockSize(64 * 1024).build(); + StoreFile.Writer writer = new StoreFile.WriterBuilder(CONF, cacheConf, fs) + .withFilePath(storeFilePath) + .withFileContext(meta) + .build(); + final int rowLen = 32; + Random RNG = new Random(); + for (int i = 0; i < 1000; ++i) { + byte[] k = RandomKeyValueUtil.randomOrderedKey(RNG, i); + byte[] v = RandomKeyValueUtil.randomValue(RNG); + int cfLen = RNG.nextInt(k.length - rowLen + 1); + KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + cfLen, + k.length - rowLen - cfLen, RNG.nextLong(), generateKeyType(RNG), v, 0, v.length); + writer.append(kv); + } + writer.close(); + return writer.getPath(); + } + + private static KeyValue.Type generateKeyType(Random rand) { + if (rand.nextBoolean()) { + // Let's make half of KVs puts. + return KeyValue.Type.Put; + } else { + KeyValue.Type keyType = KeyValue.Type.values()[1 + rand.nextInt(NUM_VALID_KEY_TYPES)]; + if (keyType == KeyValue.Type.Minimum || keyType == KeyValue.Type.Maximum) { + throw new RuntimeException("Generated an invalid key type: " + keyType + ". " + + "Probably the layout of KeyValue.Type has changed."); + } + return keyType; + } + } + + private StoreFile readStoreFile(Path storeFilePath, Configuration conf) throws Exception { + // Open the file reader with block cache disabled. + StoreFile file = new StoreFile(fs, storeFilePath, conf, cacheConf, BloomType.NONE); + return file; + } + + private void testScannerCloseAndUpdateReaderInternal(final boolean awaitUpdate, + final boolean awaitClose) throws IOException, InterruptedException { + // start write to store file. + Path path = writeStoreFile(); + StoreFile file = null; + final List files = new ArrayList(); + try { + file = readStoreFile(path, CONF); + files.add(file); + } catch (Exception e) { + // fail test + assertTrue(false); + } + scanFixture(kvs); + // scanners.add(storeFileScanner); + try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo, + new Scan(), getCols("a", "d"), 100L)) { + Thread closeThread = new Thread() { + public void run() { + scan.close(awaitClose, true); + } + }; + closeThread.start(); + Thread updateThread = new Thread() { + public void run() { + try { + scan.updateReaders(awaitUpdate, files, Collections.emptyList()); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + updateThread.start(); + // complete both the threads + closeThread.join(); + // complete both the threads + updateThread.join(); + if (file.getReader() != null) { + // the fileReader is not null when the updateReaders has completed first. + // in the other case the fileReader will be null. + int refCount = file.getReader().getRefCount(); + assertTrue("The store scanner count should be 0", refCount == 0); + } + } + } + + private static class ExtendedStoreScanner extends StoreScanner { + private CountDownLatch latch = new CountDownLatch(1); + + public ExtendedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + NavigableSet columns, long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + public void updateReaders(boolean await, List sfs, + List memStoreScanners) throws IOException { + if (await) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + super.updateReaders(sfs, memStoreScanners); + if (!await) { + latch.countDown(); + } + } + + // creating a dummy close + public void close(boolean await, boolean dummy) { + if (await) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + super.close(); + if (!await) { + latch.countDown(); + } + } + } + + NavigableSet getCols(String... strCols) { + NavigableSet cols = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (String col : strCols) { + byte[] bytes = Bytes.toBytes(col); + cols.add(bytes); + } + return cols; + } +}