From 2e0f80c3337c89bac1855e1689ab56464295ab83 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Sat, 16 Nov 2013 14:54:04 +0000 Subject: [PATCH] HBASE-9949 Fix the race condition between Compaction and StoreScanner.init git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1542519 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HStore.java | 5 + .../hbase/regionserver/StoreScanner.java | 28 ++- .../hadoop/hbase/util/InjectionEvent.java | 32 ++++ .../hadoop/hbase/util/InjectionHandler.java | 171 ++++++++++++++++++ .../hbase/regionserver/TestStoreScanner.java | 114 ++++++++++++ 5 files changed, 347 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index fecf2475832..bbd75c8fc1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -77,6 +78,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -1421,6 +1424,8 @@ public class HStore implements Store { // scenario that could have happened if continue to hold the lock. notifyChangedReadersObservers(); // At this point the store will use new files for all scanners. + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); // let the archive util decide if we should archive or delete the files LOG.debug("Removing store files after compaction..."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 1c947a174d5..a92dca616b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -38,10 +38,11 @@ import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; /** * Scanner scans both the memstore and the Store. Coalesce KeyValue stream @@ -100,6 +101,13 @@ public class StoreScanner extends NonLazyKeyValueScanner private final long readPt; + // used by the injection framework to test race between StoreScanner construction and compaction + enum StoreScannerCompactionRace { + BEFORE_SEEK, + AFTER_SEEK, + COMPACT_COMPLETE + } + /** An internal constructor. */ protected StoreScanner(Store store, boolean cacheBlocks, Scan scan, final NavigableSet columns, long ttl, int minVersions, long readPt) { @@ -155,6 +163,8 @@ public class StoreScanner extends NonLazyKeyValueScanner ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS); + this.store.addChangedReaderObserver(this); + // Pass columns to try to filter out unnecessary StoreFiles. List scanners = getScannersNoCompaction(); @@ -162,6 +172,8 @@ public class StoreScanner extends NonLazyKeyValueScanner // key does not exist, then to the start of the next matching Row). // Always check bloom filter to optimize the top row seek for delete // family marker. + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); if (explicitColumnQuery && lazySeekEnabledGlobally) { for (KeyValueScanner scanner : scanners) { scanner.requestSeek(matcher.getStartKey(), false, true); @@ -184,8 +196,8 @@ public class StoreScanner extends NonLazyKeyValueScanner // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.getComparator()); - - this.store.addChangedReaderObserver(this); + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.AFTER_SEEK.ordinal()}); } /** @@ -235,9 +247,13 @@ public class StoreScanner extends NonLazyKeyValueScanner earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow); } + this.store.addChangedReaderObserver(this); + // Filter the list of scanners using Bloom filters, time range, TTL, etc. scanners = selectScannersFrom(scanners); + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.BEFORE_SEEK.ordinal()}); // Seek all scanners to the initial key if (!isParallelSeekEnabled) { for (KeyValueScanner scanner : scanners) { @@ -249,6 +265,8 @@ public class StoreScanner extends NonLazyKeyValueScanner // Combine all seeked scanners with a heap heap = new KeyValueHeap(scanners, store.getComparator()); + InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] { + StoreScannerCompactionRace.AFTER_SEEK.ordinal()}); } /** Constructor for testing. */ @@ -280,6 +298,10 @@ public class StoreScanner extends NonLazyKeyValueScanner this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType, Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS); + // In unit tests, the store could be null + if (this.store != null) { + this.store.addChangedReaderObserver(this); + } // Seek all scanners to the initial key if (!isParallelSeekEnabled) { for (KeyValueScanner scanner : scanners) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java new file mode 100644 index 00000000000..6bd88bd2cf4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java @@ -0,0 +1,32 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +/** + * Enumeration of all injection events. + * When defining new events, please PREFIX the name + * with the supervised class. + * + * Please see InjectionHandler. + */ +public enum InjectionEvent { + // Injection into Store.java + STORESCANNER_COMPACTION_RACE +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java new file mode 100644 index 00000000000..ff670bc1c19 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java @@ -0,0 +1,171 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The InjectionHandler is an object provided to a class, + * which can perform custom actions for JUnit testing. + * JUnit test can implement custom version of the handler. + * For example, let's say we want to supervise FSImage object: + * + * + * // JUnit test code + * class MyInjectionHandler extends InjectionHandler { + * protected void _processEvent(InjectionEvent event, + * Object... args) { + * if (event == InjectionEvent.MY_EVENT) { + * LOG.info("Handling my event for fsImage: " + * + args[0].toString()); + * } + * } + * } + * + * public void testMyEvent() { + * InjectionHandler ih = new MyInjectionHandler(); + * InjectionHandler.set(ih); + * ... + * + * InjectionHandler.clear(); + * } + * + * // supervised code example + * + * class FSImage { + * + * private doSomething() { + * ... + * if (condition1 && InjectionHandler.trueCondition(MY_EVENT1) { + * ... + * } + * if (condition2 || condition3 + * || InjectionHandler.falseCondition(MY_EVENT1) { + * ... + * } + * ... + * InjectionHandler.processEvent(MY_EVENT2, this) + * ... + * try { + * read(); + * InjectionHandler.processEventIO(MY_EVENT3, this, object); + * // might throw an exception when testing + * catch (IOEXception) { + * LOG.info("Exception") + * } + * ... + * } + * ... + * } + * + * + * Each unit test should use a unique event type. + * The types can be defined by adding them to + * InjectionEvent class. + * + * methods: + * + * // simulate actions + * void processEvent() + * // simulate exceptions + * void processEventIO() throws IOException + * + * // simulate conditions + * boolean trueCondition() + * boolean falseCondition() + * + * The class implementing InjectionHandler must + * override respective protected methods + * _processEvent() + * _processEventIO() + * _trueCondition() + * _falseCondition() + */ +public class InjectionHandler { + + private static final Log LOG = LogFactory.getLog(InjectionHandler.class); + + // the only handler to which everyone reports + private static InjectionHandler handler = new InjectionHandler(); + + // can not be instantiated outside, unless a testcase extends it + protected InjectionHandler() {} + + // METHODS FOR PRODUCTION CODE + + protected void _processEvent(InjectionEvent event, Object... args) { + // by default do nothing + } + + protected void _processEventIO(InjectionEvent event, Object... args) throws IOException{ + // by default do nothing + } + + protected boolean _trueCondition(InjectionEvent event, Object... args) { + return true; // neutral in conjunction + } + + protected boolean _falseCondition(InjectionEvent event, Object... args) { + return false; // neutral in alternative + } + + //////////////////////////////////////////////////////////// + + /** + * Set to the empty/production implementation. + */ + public static void clear() { + handler = new InjectionHandler(); + } + + /** + * Set custom implementation of the handler. + */ + public static void set(InjectionHandler custom) { + LOG.warn("WARNING: SETTING INJECTION HANDLER" + + " - THIS SHOULD NOT BE USED IN PRODUCTION !!!"); + handler = custom; + } + + /* + * Static methods for reporting to the handler + */ + + public static void processEvent(InjectionEvent event, Object... args) { + handler._processEvent(event, args); + } + + public static void processEventIO(InjectionEvent event, Object... args) + throws IOException { + handler._processEventIO(event, args); + } + + public static boolean trueCondition(InjectionEvent event, Object... args) { + return handler._trueCondition(event, args); + } + + public static boolean falseCondition(InjectionEvent event, Object... args) { + return handler._falseCondition(event, args); + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 745e026f9e6..d5760086a39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -27,18 +27,28 @@ import java.util.Arrays; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import junit.framework.TestCase; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.InjectionEvent; +import org.apache.hadoop.hbase.util.InjectionHandler; +import org.apache.hadoop.hbase.util.Threads; import org.junit.experimental.categories.Category; // Can't be small as it plays with EnvironmentEdgeManager @@ -501,6 +511,110 @@ public class TestStoreScanner extends TestCase { assertEquals(false, scanner.next(results)); } + private class StoreScannerCompactionRaceCondition extends InjectionHandler { + final Store store; + Boolean beforeSeek = false; + Boolean afterSeek = false; + Boolean compactionComplete = false; + final int waitTime; + boolean doneSeeking = false; + public Future f; + StoreScannerCompactionRaceCondition(Store s, int waitTime) { + this.store = s; + this.waitTime = waitTime; + } + + protected void _processEvent(InjectionEvent event, Object... args) { + if (event == InjectionEvent.STORESCANNER_COMPACTION_RACE) { + // To prevent other scanners which are not supposed to be tested from taking this code path. + if ((args instanceof Object[]) && (args.length == 1) + && (args[0] instanceof Integer)) { + StoreScannerCompactionRace sscr = StoreScannerCompactionRace.values()[(Integer)args[0]]; + switch (sscr) { + case BEFORE_SEEK : + // Inside StoreScanner ctor before seek. + synchronized (beforeSeek) { + if (!beforeSeek) { + beforeSeek = true; + f = Executors.newSingleThreadExecutor().submit(new Callable() { + @Override + public Void call() throws Exception { + StoreScanner.enableLazySeekGlobally(false); + ((HStore)store).compactRecentForTestingAssumingDefaultPolicy( + store.getStorefiles().size() / 2); + StoreScanner.enableLazySeekGlobally(true); + return null; + } + }); + Threads.sleep(waitTime); + } + } + break; + case AFTER_SEEK: + // Inside StoreScanner ctor after seek. + synchronized (afterSeek) { + if (!afterSeek) { + afterSeek = true; + this.doneSeeking = true; + } + } + break; + case COMPACT_COMPLETE: + // Inside HStore.completeCompaction + synchronized (compactionComplete) { + if (!compactionComplete) { + compactionComplete = true; + assertTrue(doneSeeking); + } + } + break; + } + } + } + } + } + + /* + * Verifies that there is no race condition between StoreScanner construction and compaction. + * This is done through 3 injection points: + * 1. before seek operation in StoreScanner ctor + * 2. after seek operation in StoreScanner ctor + * 3. after compaction completion + */ + public void testCompactionRaceCondition() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + util.startMiniCluster(1); + byte[] t = Bytes.toBytes("tbl"), cf = Bytes.toBytes("cf"); + HTable table = util.createTable(t, cf); + util.loadTable(table, cf); + util.flush(); + util.loadTable(table, cf); + util.flush(); + List regions = util.getHBaseCluster().getRegions(t); + assertTrue(regions.size() == 1); + HRegion r = regions.get(0); + Store s = r.getStore(cf); + + // Setup the injection handler. + StoreScannerCompactionRaceCondition ih = + new StoreScannerCompactionRaceCondition(s, 5); + InjectionHandler.set(ih); + + // Create a StoreScanner + TreeSet set = new TreeSet(Bytes.BYTES_COMPARATOR); + set.add(cf); + Scan scanSpec = new Scan(); + scanSpec.setStartRow(Bytes.toBytes("hjfsd")); + scanSpec.setStartRow(Bytes.toBytes("zjfsd")); + KeyValueScanner scanner = s.getScanner(scanSpec, set, s.getSmallestReadPoint()); + ih.f.get(); + + // Clear injection handling and shutdown the minicluster. + InjectionHandler.clear(); + scanner.close(); + util.shutdownMiniCluster(); + } + public void testDeleteMarkerLongevity() throws Exception { try { final long now = System.currentTimeMillis();