HBASE-9949 Fix the race condition between Compaction and StoreScanner.init

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1542519 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-11-16 14:54:04 +00:00
parent b33e4daa2f
commit 2e0f80c333
5 changed files with 347 additions and 3 deletions

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -77,6 +78,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.InjectionEvent;
import org.apache.hadoop.hbase.util.InjectionHandler;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@ -1421,6 +1424,8 @@ public class HStore implements Store {
// scenario that could have happened if continue to hold the lock.
notifyChangedReadersObservers();
// At this point the store will use new files for all scanners.
InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");

View File

@ -38,10 +38,11 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.InjectionEvent;
import org.apache.hadoop.hbase.util.InjectionHandler;
/**
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream
@ -100,6 +101,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
private final long readPt;
// used by the injection framework to test race between StoreScanner construction and compaction
enum StoreScannerCompactionRace {
BEFORE_SEEK,
AFTER_SEEK,
COMPACT_COMPLETE
}
/** An internal constructor. */
protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
@ -155,6 +163,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS);
this.store.addChangedReaderObserver(this);
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScannersNoCompaction();
@ -162,6 +172,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
if (explicitColumnQuery && lazySeekEnabledGlobally) {
for (KeyValueScanner scanner : scanners) {
scanner.requestSeek(matcher.getStartKey(), false, true);
@ -184,8 +196,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator());
this.store.addChangedReaderObserver(this);
InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
StoreScannerCompactionRace.AFTER_SEEK.ordinal()});
}
/**
@ -235,9 +247,13 @@ public class StoreScanner extends NonLazyKeyValueScanner
earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
}
this.store.addChangedReaderObserver(this);
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
scanners = selectScannersFrom(scanners);
InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
// Seek all scanners to the initial key
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {
@ -249,6 +265,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator());
InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
StoreScannerCompactionRace.AFTER_SEEK.ordinal()});
}
/** Constructor for testing. */
@ -280,6 +298,10 @@ public class StoreScanner extends NonLazyKeyValueScanner
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
// In unit tests, the store could be null
if (this.store != null) {
this.store.addChangedReaderObserver(this);
}
// Seek all scanners to the initial key
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {

View File

@ -0,0 +1,32 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
/**
* Enumeration of all injection events.
* When defining new events, please PREFIX the name
* with the supervised class.
*
* Please see InjectionHandler.
*/
public enum InjectionEvent {
// Injection into Store.java
STORESCANNER_COMPACTION_RACE
}

View File

@ -0,0 +1,171 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The InjectionHandler is an object provided to a class,
* which can perform custom actions for JUnit testing.
* JUnit test can implement custom version of the handler.
* For example, let's say we want to supervise FSImage object:
*
* <code>
* // JUnit test code
* class MyInjectionHandler extends InjectionHandler {
* protected void _processEvent(InjectionEvent event,
* Object... args) {
* if (event == InjectionEvent.MY_EVENT) {
* LOG.info("Handling my event for fsImage: "
* + args[0].toString());
* }
* }
* }
*
* public void testMyEvent() {
* InjectionHandler ih = new MyInjectionHandler();
* InjectionHandler.set(ih);
* ...
*
* InjectionHandler.clear();
* }
*
* // supervised code example
*
* class FSImage {
*
* private doSomething() {
* ...
* if (condition1 && InjectionHandler.trueCondition(MY_EVENT1) {
* ...
* }
* if (condition2 || condition3
* || InjectionHandler.falseCondition(MY_EVENT1) {
* ...
* }
* ...
* InjectionHandler.processEvent(MY_EVENT2, this)
* ...
* try {
* read();
* InjectionHandler.processEventIO(MY_EVENT3, this, object);
* // might throw an exception when testing
* catch (IOEXception) {
* LOG.info("Exception")
* }
* ...
* }
* ...
* }
* </code>
*
* Each unit test should use a unique event type.
* The types can be defined by adding them to
* InjectionEvent class.
*
* methods:
*
* // simulate actions
* void processEvent()
* // simulate exceptions
* void processEventIO() throws IOException
*
* // simulate conditions
* boolean trueCondition()
* boolean falseCondition()
*
* The class implementing InjectionHandler must
* override respective protected methods
* _processEvent()
* _processEventIO()
* _trueCondition()
* _falseCondition()
*/
public class InjectionHandler {
private static final Log LOG = LogFactory.getLog(InjectionHandler.class);
// the only handler to which everyone reports
private static InjectionHandler handler = new InjectionHandler();
// can not be instantiated outside, unless a testcase extends it
protected InjectionHandler() {}
// METHODS FOR PRODUCTION CODE
protected void _processEvent(InjectionEvent event, Object... args) {
// by default do nothing
}
protected void _processEventIO(InjectionEvent event, Object... args) throws IOException{
// by default do nothing
}
protected boolean _trueCondition(InjectionEvent event, Object... args) {
return true; // neutral in conjunction
}
protected boolean _falseCondition(InjectionEvent event, Object... args) {
return false; // neutral in alternative
}
////////////////////////////////////////////////////////////
/**
* Set to the empty/production implementation.
*/
public static void clear() {
handler = new InjectionHandler();
}
/**
* Set custom implementation of the handler.
*/
public static void set(InjectionHandler custom) {
LOG.warn("WARNING: SETTING INJECTION HANDLER" +
" - THIS SHOULD NOT BE USED IN PRODUCTION !!!");
handler = custom;
}
/*
* Static methods for reporting to the handler
*/
public static void processEvent(InjectionEvent event, Object... args) {
handler._processEvent(event, args);
}
public static void processEventIO(InjectionEvent event, Object... args)
throws IOException {
handler._processEventIO(event, args);
}
public static boolean trueCondition(InjectionEvent event, Object... args) {
return handler._trueCondition(event, args);
}
public static boolean falseCondition(InjectionEvent event, Object... args) {
return handler._falseCondition(event, args);
}
}

View File

@ -27,18 +27,28 @@ import java.util.Arrays;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.TestCase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.InjectionEvent;
import org.apache.hadoop.hbase.util.InjectionHandler;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.experimental.categories.Category;
// Can't be small as it plays with EnvironmentEdgeManager
@ -501,6 +511,110 @@ public class TestStoreScanner extends TestCase {
assertEquals(false, scanner.next(results));
}
private class StoreScannerCompactionRaceCondition extends InjectionHandler {
final Store store;
Boolean beforeSeek = false;
Boolean afterSeek = false;
Boolean compactionComplete = false;
final int waitTime;
boolean doneSeeking = false;
public Future<Void> f;
StoreScannerCompactionRaceCondition(Store s, int waitTime) {
this.store = s;
this.waitTime = waitTime;
}
protected void _processEvent(InjectionEvent event, Object... args) {
if (event == InjectionEvent.STORESCANNER_COMPACTION_RACE) {
// To prevent other scanners which are not supposed to be tested from taking this code path.
if ((args instanceof Object[]) && (args.length == 1)
&& (args[0] instanceof Integer)) {
StoreScannerCompactionRace sscr = StoreScannerCompactionRace.values()[(Integer)args[0]];
switch (sscr) {
case BEFORE_SEEK :
// Inside StoreScanner ctor before seek.
synchronized (beforeSeek) {
if (!beforeSeek) {
beforeSeek = true;
f = Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
StoreScanner.enableLazySeekGlobally(false);
((HStore)store).compactRecentForTestingAssumingDefaultPolicy(
store.getStorefiles().size() / 2);
StoreScanner.enableLazySeekGlobally(true);
return null;
}
});
Threads.sleep(waitTime);
}
}
break;
case AFTER_SEEK:
// Inside StoreScanner ctor after seek.
synchronized (afterSeek) {
if (!afterSeek) {
afterSeek = true;
this.doneSeeking = true;
}
}
break;
case COMPACT_COMPLETE:
// Inside HStore.completeCompaction
synchronized (compactionComplete) {
if (!compactionComplete) {
compactionComplete = true;
assertTrue(doneSeeking);
}
}
break;
}
}
}
}
}
/*
* Verifies that there is no race condition between StoreScanner construction and compaction.
* This is done through 3 injection points:
* 1. before seek operation in StoreScanner ctor
* 2. after seek operation in StoreScanner ctor
* 3. after compaction completion
*/
public void testCompactionRaceCondition() throws Exception {
HBaseTestingUtility util = new HBaseTestingUtility();
util.startMiniCluster(1);
byte[] t = Bytes.toBytes("tbl"), cf = Bytes.toBytes("cf");
HTable table = util.createTable(t, cf);
util.loadTable(table, cf);
util.flush();
util.loadTable(table, cf);
util.flush();
List<HRegion> regions = util.getHBaseCluster().getRegions(t);
assertTrue(regions.size() == 1);
HRegion r = regions.get(0);
Store s = r.getStore(cf);
// Setup the injection handler.
StoreScannerCompactionRaceCondition ih =
new StoreScannerCompactionRaceCondition(s, 5);
InjectionHandler.set(ih);
// Create a StoreScanner
TreeSet<byte[]> set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
set.add(cf);
Scan scanSpec = new Scan();
scanSpec.setStartRow(Bytes.toBytes("hjfsd"));
scanSpec.setStartRow(Bytes.toBytes("zjfsd"));
KeyValueScanner scanner = s.getScanner(scanSpec, set, s.getSmallestReadPoint());
ih.f.get();
// Clear injection handling and shutdown the minicluster.
InjectionHandler.clear();
scanner.close();
util.shutdownMiniCluster();
}
public void testDeleteMarkerLongevity() throws Exception {
try {
final long now = System.currentTimeMillis();