HBASE-10514 Forward port HBASE-10466, possible data loss when failed flushes
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1577353 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
98cf3e27ee
commit
731e2f6541
|
@ -1030,22 +1030,25 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
status.setStatus("Disabling compacts and flushes for region");
|
||||
boolean wasFlushing;
|
||||
synchronized (writestate) {
|
||||
// Disable compacting and flushing by background threads for this
|
||||
// region.
|
||||
writestate.writesEnabled = false;
|
||||
wasFlushing = writestate.flushing;
|
||||
LOG.debug("Closing " + this + ": disabling compactions & flushes");
|
||||
waitForFlushesAndCompactions();
|
||||
}
|
||||
// If we were not just flushing, is it worth doing a preflush...one
|
||||
// that will clear out of the bulk of the memstore before we put up
|
||||
// the close flag?
|
||||
if (!abort && !wasFlushing && worthPreFlushing()) {
|
||||
if (!abort && worthPreFlushing()) {
|
||||
status.setStatus("Pre-flushing region before close");
|
||||
LOG.info("Running close preflush of " + this.getRegionNameAsString());
|
||||
internalFlushcache(status);
|
||||
try {
|
||||
internalFlushcache(status);
|
||||
} catch (IOException ioe) {
|
||||
// Failed to flush the region. Keep going.
|
||||
status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
this.closing.set(true);
|
||||
|
@ -1061,7 +1064,30 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
LOG.debug("Updates disabled for region " + this);
|
||||
// Don't flush the cache if we are aborting
|
||||
if (!abort) {
|
||||
internalFlushcache(status);
|
||||
int flushCount = 0;
|
||||
while (this.getMemstoreSize().get() > 0) {
|
||||
try {
|
||||
if (flushCount++ > 0) {
|
||||
int actualFlushes = flushCount - 1;
|
||||
if (actualFlushes > 5) {
|
||||
// If we tried 5 times and are unable to clear memory, abort
|
||||
// so we do not lose data
|
||||
throw new DroppedSnapshotException("Failed clearing memory after " +
|
||||
actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
|
||||
}
|
||||
LOG.info("Running extra flush, " + actualFlushes +
|
||||
" (carrying snapshot?) " + this);
|
||||
}
|
||||
internalFlushcache(status);
|
||||
} catch (IOException ioe) {
|
||||
status.setStatus("Failed flush " + this + ", putting online again");
|
||||
synchronized (writestate) {
|
||||
writestate.writesEnabled = true;
|
||||
}
|
||||
// Have to throw to upper layers. I can't abort server from here.
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<byte[], List<StoreFile>> result =
|
||||
|
@ -1075,6 +1101,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// close each store in parallel
|
||||
for (final Store store : stores.values()) {
|
||||
assert abort? true: store.getFlushableSize() == 0;
|
||||
completionService
|
||||
.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
|
||||
@Override
|
||||
|
@ -1104,7 +1131,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
this.closed.set(true);
|
||||
|
||||
if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
|
||||
if (coprocessorHost != null) {
|
||||
status.setStatus("Running coprocessor post-close hooks");
|
||||
this.coprocessorHost.postClose(abort);
|
||||
|
@ -1608,7 +1635,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
status.setStatus("Obtaining lock to block concurrent updates");
|
||||
// block waiting for the lock for internal flush
|
||||
this.updatesLock.writeLock().lock();
|
||||
long flushsize = this.memstoreSize.get();
|
||||
long totalFlushableSize = 0;
|
||||
status.setStatus("Preparing to flush by snapshotting stores");
|
||||
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
|
||||
long flushSeqId = -1L;
|
||||
|
@ -1630,6 +1657,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
for (Store s : stores.values()) {
|
||||
totalFlushableSize += s.getFlushableSize();
|
||||
storeFlushCtxs.add(s.createFlushContext(flushSeqId));
|
||||
}
|
||||
|
||||
|
@ -1641,7 +1669,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.updatesLock.writeLock().unlock();
|
||||
}
|
||||
String s = "Finished memstore snapshotting " + this +
|
||||
", syncing WAL and waiting on mvcc, flushsize=" + flushsize;
|
||||
", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
|
||||
status.setStatus(s);
|
||||
if (LOG.isTraceEnabled()) LOG.trace(s);
|
||||
|
||||
|
@ -1688,7 +1716,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
storeFlushCtxs.clear();
|
||||
|
||||
// Set down the memstore size by amount of flush.
|
||||
this.addAndGetGlobalMemstoreSize(-flushsize);
|
||||
this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
|
||||
} catch (Throwable t) {
|
||||
// An exception here means that the snapshot was not persisted.
|
||||
// The hlog needs to be replayed so its content is restored to memstore.
|
||||
|
@ -1726,7 +1754,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
long memstoresize = this.memstoreSize.get();
|
||||
String msg = "Finished memstore flush of ~" +
|
||||
StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
|
||||
StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
|
||||
", currentsize=" +
|
||||
StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
|
||||
" for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
|
||||
|
@ -1734,7 +1762,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
((wal == null)? "; wal=null": "");
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg);
|
||||
this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
|
||||
this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
|
||||
|
||||
return compactionRequested;
|
||||
}
|
||||
|
|
|
@ -368,9 +368,15 @@ public class HStore implements Store {
|
|||
|
||||
@Override
|
||||
public long getMemstoreFlushSize() {
|
||||
// TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack
|
||||
return this.region.memstoreFlushSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFlushableSize() {
|
||||
return this.memstore.getFlushableSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompactionCheckMultiplier() {
|
||||
return this.compactionCheckMultiplier;
|
||||
|
@ -801,7 +807,7 @@ public class HStore implements Store {
|
|||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed flushing store file, retring num=" + i, e);
|
||||
LOG.warn("Failed flushing store file, retrying num=" + i, e);
|
||||
lastException = e;
|
||||
}
|
||||
if (lastException != null && i < (flushRetriesNumber - 1)) {
|
||||
|
|
|
@ -86,6 +86,7 @@ public class MemStore implements HeapSize {
|
|||
|
||||
// Used to track own heapSize
|
||||
final AtomicLong size;
|
||||
private volatile long snapshotSize;
|
||||
|
||||
// Used to track when to flush
|
||||
volatile long timeOfOldestEdit = Long.MAX_VALUE;
|
||||
|
@ -117,6 +118,7 @@ public class MemStore implements HeapSize {
|
|||
timeRangeTracker = new TimeRangeTracker();
|
||||
snapshotTimeRangeTracker = new TimeRangeTracker();
|
||||
this.size = new AtomicLong(DEEP_OVERHEAD);
|
||||
this.snapshotSize = 0;
|
||||
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
|
||||
this.chunkPool = MemStoreChunkPool.getPool(conf);
|
||||
this.allocator = new MemStoreLAB(conf, chunkPool);
|
||||
|
@ -148,6 +150,7 @@ public class MemStore implements HeapSize {
|
|||
"Doing nothing. Another ongoing flush or did we fail last attempt?");
|
||||
} else {
|
||||
if (!this.kvset.isEmpty()) {
|
||||
this.snapshotSize = keySize();
|
||||
this.snapshot = this.kvset;
|
||||
this.kvset = new KeyValueSkipListSet(this.comparator);
|
||||
this.snapshotTimeRangeTracker = this.timeRangeTracker;
|
||||
|
@ -176,6 +179,18 @@ public class MemStore implements HeapSize {
|
|||
return this.snapshot;
|
||||
}
|
||||
|
||||
/**
|
||||
* On flush, how much memory we will clear.
|
||||
* Flush will first clear out the data in snapshot if any (It will take a second flush
|
||||
* invocation to clear the current Cell set). If snapshot is empty, current
|
||||
* Cell set will be flushed.
|
||||
*
|
||||
* @return size of data that is going to be flushed
|
||||
*/
|
||||
long getFlushableSize() {
|
||||
return this.snapshotSize > 0 ? this.snapshotSize : keySize();
|
||||
}
|
||||
|
||||
/**
|
||||
* The passed snapshot was successfully persisted; it can be let go.
|
||||
* @param ss The snapshot to clean out.
|
||||
|
@ -195,6 +210,7 @@ public class MemStore implements HeapSize {
|
|||
this.snapshot = new KeyValueSkipListSet(this.comparator);
|
||||
this.snapshotTimeRangeTracker = new TimeRangeTracker();
|
||||
}
|
||||
this.snapshotSize = 0;
|
||||
if (this.snapshotAllocator != null) {
|
||||
tmpAllocator = this.snapshotAllocator;
|
||||
this.snapshotAllocator = null;
|
||||
|
@ -983,7 +999,7 @@ public class MemStore implements HeapSize {
|
|||
}
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
|
||||
ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
|
||||
|
||||
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||
ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
|
||||
|
|
|
@ -254,6 +254,13 @@ public interface Store extends HeapSize, StoreConfigInformation {
|
|||
*/
|
||||
long getMemStoreSize();
|
||||
|
||||
/**
|
||||
* @return The amount of memory we could flush from this memstore; usually this is equal to
|
||||
* {@link #getMemStoreSize()} unless we are carrying snapshots and then it will be the size of
|
||||
* outstanding snapshots.
|
||||
*/
|
||||
long getFlushableSize();
|
||||
|
||||
HColumnDescriptor getFamily();
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,6 +33,8 @@ public interface StoreConfigInformation {
|
|||
/**
|
||||
* @return Gets the Memstore flush size for the region that this store works with.
|
||||
*/
|
||||
// TODO: Why is this in here? It should be in Store and it should return the Store flush size,
|
||||
// not the Regions. St.Ack
|
||||
long getMemstoreFlushSize();
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,7 +33,10 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -41,6 +44,7 @@ import static org.mockito.Mockito.verify;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -65,6 +69,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -96,7 +101,6 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
|||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
|
@ -111,12 +115,14 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
|
||||
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
|
@ -136,7 +142,7 @@ import org.mockito.Mockito;
|
|||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Basic stand-alone testing of HRegion.
|
||||
* Basic stand-alone testing of HRegion. No clusters!
|
||||
*
|
||||
* A lot of the meta information for an HRegion now lives inside other HRegions
|
||||
* or in the HBaseMaster, so only basic testing is possible.
|
||||
|
@ -150,12 +156,14 @@ public class TestHRegion {
|
|||
@Rule public TestName name = new TestName();
|
||||
|
||||
private static final String COLUMN_FAMILY = "MyCF";
|
||||
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
||||
|
||||
HRegion region = null;
|
||||
private static HBaseTestingUtility TEST_UTIL; // do not run unit tests in parallel
|
||||
public static Configuration conf ;
|
||||
private String DIR;
|
||||
private static FileSystem fs;
|
||||
// Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack)
|
||||
private static HBaseTestingUtility TEST_UTIL;
|
||||
public static Configuration CONF ;
|
||||
private String dir;
|
||||
private static FileSystem FILESYSTEM;
|
||||
private final int MAX_VERSIONS = 2;
|
||||
|
||||
// Test names
|
||||
|
@ -174,10 +182,10 @@ public class TestHRegion {
|
|||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
this.TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
this.fs = TEST_UTIL.getTestFileSystem();
|
||||
this.conf = TEST_UTIL.getConfiguration();
|
||||
this.DIR = TEST_UTIL.getDataTestDir("TestHRegion").toString();
|
||||
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
FILESYSTEM = TEST_UTIL.getTestFileSystem();
|
||||
CONF = TEST_UTIL.getConfiguration();
|
||||
dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
|
||||
method = name.getMethodName();
|
||||
tableName = Bytes.toBytes(name.getMethodName());
|
||||
}
|
||||
|
@ -192,17 +200,169 @@ public class TestHRegion {
|
|||
String getName() {
|
||||
return name.getMethodName();
|
||||
}
|
||||
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
// New tests that doesn't spin up a mini cluster but rather just test the
|
||||
// individual code pieces in the HRegion. Putting files locally in
|
||||
// /tmp/testtable
|
||||
// ////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Test for Bug 2 of HBASE-10466.
|
||||
* "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
|
||||
* is smaller than a certain value, or when region close starts a flush is ongoing, the first
|
||||
* flush is skipped and only the second flush takes place. However, two flushes are required in
|
||||
* case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
|
||||
* in current memstore. The fix is removing all conditions except abort check so we ensure 2
|
||||
* flushes for region close."
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testCloseCarryingSnapshot() throws IOException {
|
||||
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
|
||||
Store store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||
// Get some random bytes.
|
||||
byte [] value = Bytes.toBytes(name.getMethodName());
|
||||
// Make a random put against our cf.
|
||||
Put put = new Put(value);
|
||||
put.add(COLUMN_FAMILY_BYTES, null, value);
|
||||
// First put something in current memstore, which will be in snapshot after flusher.prepare()
|
||||
region.put(put);
|
||||
StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
|
||||
storeFlushCtx.prepare();
|
||||
// Second put something in current memstore
|
||||
put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
|
||||
region.put(put);
|
||||
// Close with something in memstore and something in the snapshot. Make sure all is cleared.
|
||||
region.close();
|
||||
assertEquals(0, region.getMemstoreSize().get());
|
||||
HRegion.closeHRegion(region);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test we do not lose data if we fail a flush and then close.
|
||||
* Part of HBase-10466. Tests the following from the issue description:
|
||||
* "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
|
||||
* kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
|
||||
* the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
|
||||
* the sum of current memstore sizes instead of snapshots left from previous failed flush. This
|
||||
* calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
|
||||
* gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
|
||||
* in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
|
||||
* much smaller than expected. In extreme case, if the error accumulates to even bigger than
|
||||
* HRegion's memstore size limit, any further flush is skipped because flush does not do anything
|
||||
* if memstoreSize is not larger than 0."
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=60000)
|
||||
public void testFlushSizeAccounting() throws Exception {
|
||||
final Configuration conf = HBaseConfiguration.create(CONF);
|
||||
// Only retry once.
|
||||
conf.setInt("hbase.hstore.flush.retries.number", 1);
|
||||
final User user =
|
||||
User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
|
||||
// Inject our faulty LocalFileSystem
|
||||
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
|
||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||
public Object run() throws Exception {
|
||||
// Make sure it worked (above is sensitive to caching details in hadoop core)
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
FaultyFileSystem ffs = (FaultyFileSystem)fs;
|
||||
HRegion region = null;
|
||||
try {
|
||||
// Initialize region
|
||||
region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
|
||||
long size = region.getMemstoreSize().get();
|
||||
Assert.assertEquals(0, size);
|
||||
// Put one item into memstore. Measure the size of one item in memstore.
|
||||
Put p1 = new Put(row);
|
||||
p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
|
||||
region.put(p1);
|
||||
final long sizeOfOnePut = region.getMemstoreSize().get();
|
||||
// Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
|
||||
try {
|
||||
LOG.info("Flushing");
|
||||
region.flushcache();
|
||||
Assert.fail("Didn't bubble up IOE!");
|
||||
} catch (DroppedSnapshotException dse) {
|
||||
// What we are expecting
|
||||
}
|
||||
// Make it so all writes succeed from here on out
|
||||
ffs.fault.set(false);
|
||||
// Check sizes. Should still be the one entry.
|
||||
Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize().get());
|
||||
// Now add two entries so that on this next flush that fails, we can see if we
|
||||
// subtract the right amount, the snapshot size only.
|
||||
Put p2 = new Put(row);
|
||||
p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
|
||||
p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
|
||||
region.put(p2);
|
||||
Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize().get());
|
||||
// Do a successful flush. It will clear the snapshot only. Thats how flushes work.
|
||||
// If already a snapshot, we clear it else we move the memstore to be snapshot and flush
|
||||
// it
|
||||
region.flushcache();
|
||||
// Make sure our memory accounting is right.
|
||||
Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize().get());
|
||||
} finally {
|
||||
HRegion.closeHRegion(region);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
FileSystem.closeAllForUGI(user.getUGI());
|
||||
}
|
||||
|
||||
@Test (timeout=60000)
|
||||
public void testCloseWithFailingFlush() throws Exception {
|
||||
final Configuration conf = HBaseConfiguration.create(CONF);
|
||||
// Only retry once.
|
||||
conf.setInt("hbase.hstore.flush.retries.number", 1);
|
||||
final User user =
|
||||
User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
|
||||
// Inject our faulty LocalFileSystem
|
||||
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
|
||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||
public Object run() throws Exception {
|
||||
// Make sure it worked (above is sensitive to caching details in hadoop core)
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
FaultyFileSystem ffs = (FaultyFileSystem)fs;
|
||||
HRegion region = null;
|
||||
try {
|
||||
// Initialize region
|
||||
region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
|
||||
long size = region.getMemstoreSize().get();
|
||||
Assert.assertEquals(0, size);
|
||||
// Put one item into memstore. Measure the size of one item in memstore.
|
||||
Put p1 = new Put(row);
|
||||
p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
|
||||
region.put(p1);
|
||||
// Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
|
||||
Store store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||
StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
|
||||
storeFlushCtx.prepare();
|
||||
// Now add two entries to the foreground memstore.
|
||||
Put p2 = new Put(row);
|
||||
p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
|
||||
p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
|
||||
region.put(p2);
|
||||
// Now try close on top of a failing flush.
|
||||
region.close();
|
||||
fail();
|
||||
} catch (DroppedSnapshotException dse) {
|
||||
// Expected
|
||||
LOG.info("Expected DroppedSnapshotException");
|
||||
} finally {
|
||||
// Make it so all writes succeed from here on out so can close clean
|
||||
ffs.fault.set(false);
|
||||
HRegion.closeHRegion(region);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
FileSystem.closeAllForUGI(user.getUGI());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionAffectedByScanners() throws Exception {
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
|
||||
Put put = new Put(Bytes.toBytes("r1"));
|
||||
put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
|
||||
|
@ -250,7 +410,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testToShowNPEOnRegionScannerReseek() throws Exception {
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
|
||||
Put put = new Put(Bytes.toBytes("r1"));
|
||||
put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
|
||||
|
@ -282,7 +442,7 @@ public class TestHRegion {
|
|||
String method = "testSkipRecoveredEditsReplay";
|
||||
TableName tableName = TableName.valueOf(method);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
try {
|
||||
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
@ -296,7 +456,7 @@ public class TestHRegion {
|
|||
for (long i = minSeqId; i <= maxSeqId; i += 10) {
|
||||
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
|
||||
fs.create(recoveredEdits);
|
||||
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
|
||||
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
|
||||
|
||||
long time = System.nanoTime();
|
||||
WALEdit edit = new WALEdit();
|
||||
|
@ -332,7 +492,7 @@ public class TestHRegion {
|
|||
String method = "testSkipRecoveredEditsReplaySomeIgnored";
|
||||
TableName tableName = TableName.valueOf(method);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
try {
|
||||
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
@ -346,7 +506,7 @@ public class TestHRegion {
|
|||
for (long i = minSeqId; i <= maxSeqId; i += 10) {
|
||||
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
|
||||
fs.create(recoveredEdits);
|
||||
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
|
||||
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
|
||||
|
||||
long time = System.nanoTime();
|
||||
WALEdit edit = new WALEdit();
|
||||
|
@ -385,7 +545,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
try {
|
||||
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
@ -419,7 +579,7 @@ public class TestHRegion {
|
|||
String method = name.getMethodName();
|
||||
TableName tableName = TableName.valueOf(method);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
try {
|
||||
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
@ -443,7 +603,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
// disable compaction completion
|
||||
conf.setBoolean("hbase.hstore.compaction.complete", false);
|
||||
CONF.setBoolean("hbase.hstore.compaction.complete", false);
|
||||
region.compactStores();
|
||||
|
||||
// ensure that nothing changed
|
||||
|
@ -471,7 +631,7 @@ public class TestHRegion {
|
|||
|
||||
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
|
||||
fs.create(recoveredEdits);
|
||||
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf);
|
||||
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
|
||||
|
||||
long time = System.nanoTime();
|
||||
|
||||
|
@ -480,8 +640,8 @@ public class TestHRegion {
|
|||
writer.close();
|
||||
|
||||
// close the region now, and reopen again
|
||||
HTableDescriptor htd = region.getTableDesc();
|
||||
HRegionInfo info = region.getRegionInfo();
|
||||
region.getTableDesc();
|
||||
region.getRegionInfo();
|
||||
region.close();
|
||||
region = HRegion.openHRegion(region, null);
|
||||
|
||||
|
@ -602,7 +762,7 @@ public class TestHRegion {
|
|||
byte[] TABLE = Bytes.toBytes("testWeirdCacheBehaviour");
|
||||
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
|
||||
Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
|
||||
this.region = initHRegion(TABLE, getName(), conf, FAMILIES);
|
||||
this.region = initHRegion(TABLE, getName(), CONF, FAMILIES);
|
||||
try {
|
||||
String value = "this is the value";
|
||||
String value2 = "this is some other value";
|
||||
|
@ -643,7 +803,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testAppendWithReadOnlyTable() throws Exception {
|
||||
byte[] TABLE = Bytes.toBytes("readOnlyTable");
|
||||
this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily"));
|
||||
this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
|
||||
boolean exceptionCaught = false;
|
||||
Append append = new Append(Bytes.toBytes("somerow"));
|
||||
append.setDurability(Durability.SKIP_WAL);
|
||||
|
@ -663,7 +823,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testIncrWithReadOnlyTable() throws Exception {
|
||||
byte[] TABLE = Bytes.toBytes("readOnlyTable");
|
||||
this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily"));
|
||||
this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
|
||||
boolean exceptionCaught = false;
|
||||
Increment inc = new Increment(Bytes.toBytes("somerow"));
|
||||
inc.setDurability(Durability.SKIP_WAL);
|
||||
|
@ -756,7 +916,7 @@ public class TestHRegion {
|
|||
public void testFamilyWithAndWithoutColon() throws Exception {
|
||||
byte[] b = Bytes.toBytes(getName());
|
||||
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
|
||||
this.region = initHRegion(b, getName(), conf, cf);
|
||||
this.region = initHRegion(b, getName(), CONF, cf);
|
||||
try {
|
||||
Put p = new Put(b);
|
||||
byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
|
||||
|
@ -780,7 +940,7 @@ public class TestHRegion {
|
|||
byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
|
||||
byte[] qual = Bytes.toBytes("qual");
|
||||
byte[] val = Bytes.toBytes("val");
|
||||
this.region = initHRegion(b, getName(), conf, cf);
|
||||
this.region = initHRegion(b, getName(), CONF, cf);
|
||||
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
||||
try {
|
||||
long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
|
||||
|
@ -814,7 +974,7 @@ public class TestHRegion {
|
|||
LOG.info("Next a batch put that has to break into two batches to avoid a lock");
|
||||
RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
|
||||
|
||||
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
|
||||
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
|
||||
final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
|
||||
TestThread putter = new TestThread(ctx) {
|
||||
@Override
|
||||
|
@ -860,8 +1020,8 @@ public class TestHRegion {
|
|||
byte[] val = Bytes.toBytes("val");
|
||||
|
||||
// add data with a timestamp that is too recent for range. Ensure assert
|
||||
conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
||||
this.region = initHRegion(b, getName(), conf, cf);
|
||||
CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
||||
this.region = initHRegion(b, getName(), CONF, cf);
|
||||
|
||||
try {
|
||||
MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
|
||||
|
@ -902,7 +1062,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Putting empty data in key
|
||||
Put put = new Put(row1);
|
||||
|
@ -976,7 +1136,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Putting data in key
|
||||
Put put = new Put(row1);
|
||||
|
@ -1009,7 +1169,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Putting data in key
|
||||
Put put = new Put(row1);
|
||||
|
@ -1045,7 +1205,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Putting val3 in key
|
||||
Put put = new Put(row1);
|
||||
|
@ -1141,7 +1301,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Putting data in the key to check
|
||||
Put put = new Put(row1);
|
||||
|
@ -1182,12 +1342,12 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testCheckAndPut_wrongRowInPut() throws IOException {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
this.region = initHRegion(tableName, this.getName(), conf, COLUMNS);
|
||||
this.region = initHRegion(tableName, this.getName(), CONF, COLUMNS);
|
||||
try {
|
||||
Put put = new Put(row2);
|
||||
put.add(fam1, qual1, value1);
|
||||
try {
|
||||
boolean res = region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
|
||||
region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
|
||||
new BinaryComparator(value2), put, false);
|
||||
fail();
|
||||
} catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
|
||||
|
@ -1216,7 +1376,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Put content
|
||||
Put put = new Put(row1);
|
||||
|
@ -1291,7 +1451,7 @@ public class TestHRegion {
|
|||
put.add(fam1, qual, 2, value);
|
||||
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
region.put(put);
|
||||
|
||||
|
@ -1321,7 +1481,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1, fam2, fam3);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
|
||||
try {
|
||||
List<Cell> kvs = new ArrayList<Cell>();
|
||||
kvs.add(new KeyValue(row1, fam4, null, null));
|
||||
|
@ -1360,7 +1520,7 @@ public class TestHRegion {
|
|||
byte[] fam = Bytes.toBytes("info");
|
||||
byte[][] families = { fam };
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
|
||||
|
||||
|
@ -1428,7 +1588,7 @@ public class TestHRegion {
|
|||
byte[] fam = Bytes.toBytes("info");
|
||||
byte[][] families = { fam };
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
byte[] row = Bytes.toBytes("table_name");
|
||||
// column names
|
||||
|
@ -1471,7 +1631,7 @@ public class TestHRegion {
|
|||
byte[] fam = Bytes.toBytes("info");
|
||||
byte[][] families = { fam };
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
byte[] row = Bytes.toBytes("row1");
|
||||
// column names
|
||||
|
@ -1525,8 +1685,8 @@ public class TestHRegion {
|
|||
String method = this.getName();
|
||||
|
||||
// add data with a timestamp that is too recent for range. Ensure assert
|
||||
conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
boolean caughtExcep = false;
|
||||
try {
|
||||
try {
|
||||
|
@ -1551,7 +1711,7 @@ public class TestHRegion {
|
|||
public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
|
||||
byte[] fam1 = Bytes.toBytes("columnA");
|
||||
byte[] fam2 = Bytes.toBytes("columnB");
|
||||
this.region = initHRegion(tableName, getName(), conf, fam1, fam2);
|
||||
this.region = initHRegion(tableName, getName(), CONF, fam1, fam2);
|
||||
try {
|
||||
byte[] rowA = Bytes.toBytes("rowA");
|
||||
byte[] rowB = Bytes.toBytes("rowB");
|
||||
|
@ -1605,7 +1765,7 @@ public class TestHRegion {
|
|||
|
||||
public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
this.region = initHRegion(tableName, getName(), conf, fam1);
|
||||
this.region = initHRegion(tableName, getName(), CONF, fam1);
|
||||
try {
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
|
||||
Put put = new Put(row);
|
||||
|
@ -1658,7 +1818,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Building checkerList
|
||||
List<Cell> kvs = new ArrayList<Cell>();
|
||||
|
@ -1699,7 +1859,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
Get get = new Get(row1);
|
||||
get.addColumn(fam2, col1);
|
||||
|
@ -1730,7 +1890,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Add to memstore
|
||||
Put put = new Put(row1);
|
||||
|
@ -1776,7 +1936,7 @@ public class TestHRegion {
|
|||
byte[] fam = Bytes.toBytes("fam");
|
||||
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam);
|
||||
this.region = initHRegion(tableName, method, CONF, fam);
|
||||
try {
|
||||
Get get = new Get(row);
|
||||
get.addFamily(fam);
|
||||
|
@ -1820,9 +1980,9 @@ public class TestHRegion {
|
|||
region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
|
||||
LOG.info("Merge regions elapsed time: "
|
||||
+ ((System.currentTimeMillis() - startTime) / 1000.0));
|
||||
fs.delete(oldRegion1, true);
|
||||
fs.delete(oldRegion2, true);
|
||||
fs.delete(oldRegionPath, true);
|
||||
FILESYSTEM.delete(oldRegion1, true);
|
||||
FILESYSTEM.delete(oldRegion2, true);
|
||||
FILESYSTEM.delete(oldRegionPath, true);
|
||||
LOG.info("splitAndMerge completed.");
|
||||
} finally {
|
||||
for (int i = 0; i < subregions.length; i++) {
|
||||
|
@ -1884,7 +2044,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(fam1);
|
||||
|
@ -1909,7 +2069,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(fam2);
|
||||
|
@ -1938,7 +2098,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
|
||||
// Putting data in Region
|
||||
|
@ -1985,7 +2145,7 @@ public class TestHRegion {
|
|||
// Setting up region
|
||||
String method = this.getName();
|
||||
try {
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
fail("Got IOException during initHRegion, " + e.getMessage());
|
||||
|
@ -2021,7 +2181,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Putting data in Region
|
||||
Put put = null;
|
||||
|
@ -2087,7 +2247,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Putting data in Region
|
||||
Put put = null;
|
||||
|
@ -2146,7 +2306,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Putting data in Region
|
||||
Put put = null;
|
||||
|
@ -2210,7 +2370,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Putting data in Region
|
||||
KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
|
||||
|
@ -2291,7 +2451,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
// Putting data in Region
|
||||
Put put = null;
|
||||
|
@ -2351,7 +2511,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Putting data in Region
|
||||
Put put = null;
|
||||
|
@ -2402,7 +2562,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testScanner_StopRow1542() throws IOException {
|
||||
byte[] family = Bytes.toBytes("testFamily");
|
||||
this.region = initHRegion(tableName, getName(), conf, family);
|
||||
this.region = initHRegion(tableName, getName(), CONF, family);
|
||||
try {
|
||||
byte[] row1 = Bytes.toBytes("row111");
|
||||
byte[] row2 = Bytes.toBytes("row222");
|
||||
|
@ -2447,19 +2607,6 @@ public class TestHRegion {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, int amount)
|
||||
throws IOException {
|
||||
// run a get and see?
|
||||
Get get = new Get(row);
|
||||
get.addColumn(familiy, qualifier);
|
||||
Result result = region.get(get);
|
||||
assertEquals(1, result.size());
|
||||
|
||||
Cell kv = result.rawCells()[0];
|
||||
int r = Bytes.toInt(CellUtil.cloneValue(kv));
|
||||
assertEquals(amount, r);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
|
@ -2474,7 +2621,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = this.getName();
|
||||
this.region = initHRegion(tableName, method, conf, fam1);
|
||||
this.region = initHRegion(tableName, method, CONF, fam1);
|
||||
try {
|
||||
// Putting data in Region
|
||||
KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
|
||||
|
@ -2550,7 +2697,7 @@ public class TestHRegion {
|
|||
byte[] cf_essential = Bytes.toBytes("essential");
|
||||
byte[] cf_joined = Bytes.toBytes("joined");
|
||||
byte[] cf_alpha = Bytes.toBytes("alpha");
|
||||
this.region = initHRegion(tableName, getName(), conf, cf_essential, cf_joined, cf_alpha);
|
||||
this.region = initHRegion(tableName, getName(), CONF, cf_essential, cf_joined, cf_alpha);
|
||||
try {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] row2 = Bytes.toBytes("row2");
|
||||
|
@ -2618,7 +2765,7 @@ public class TestHRegion {
|
|||
final byte[] cf_first = Bytes.toBytes("first");
|
||||
final byte[] cf_second = Bytes.toBytes("second");
|
||||
|
||||
this.region = initHRegion(tableName, getName(), conf, cf_first, cf_second);
|
||||
this.region = initHRegion(tableName, getName(), CONF, cf_first, cf_second);
|
||||
try {
|
||||
final byte[] col_a = Bytes.toBytes("a");
|
||||
final byte[] col_b = Bytes.toBytes("b");
|
||||
|
@ -2854,7 +3001,7 @@ public class TestHRegion {
|
|||
int compactInterval = 10 * flushAndScanInterval;
|
||||
|
||||
String method = "testFlushCacheWhileScanning";
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
try {
|
||||
FlushThread flushThread = new FlushThread();
|
||||
flushThread.start();
|
||||
|
@ -2986,7 +3133,7 @@ public class TestHRegion {
|
|||
}
|
||||
|
||||
String method = "testWritesWhileScanning";
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
try {
|
||||
PutThread putThread = new PutThread(numRows, families, qualifiers);
|
||||
putThread.start();
|
||||
|
@ -3149,6 +3296,7 @@ public class TestHRegion {
|
|||
// extending over the ulimit. Make sure compactions are aggressive in
|
||||
// reducing
|
||||
// the number of HFiles created.
|
||||
Configuration conf = HBaseConfiguration.create(CONF);
|
||||
conf.setInt("hbase.hstore.compaction.min", 1);
|
||||
conf.setInt("hbase.hstore.compaction.max", 1000);
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
|
@ -3238,7 +3386,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testHolesInMeta() throws Exception {
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, conf,
|
||||
this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
|
||||
false, family);
|
||||
try {
|
||||
byte[] rowNotServed = Bytes.toBytes("a");
|
||||
|
@ -3264,7 +3412,7 @@ public class TestHRegion {
|
|||
|
||||
// Setting up region
|
||||
String method = "testIndexesScanWithOneDeletedRow";
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
try {
|
||||
Put put = new Put(Bytes.toBytes(1L));
|
||||
put.add(family, qual1, 1L, Bytes.toBytes(1L));
|
||||
|
@ -3302,7 +3450,6 @@ public class TestHRegion {
|
|||
// ////////////////////////////////////////////////////////////////////////////
|
||||
@Test
|
||||
public void testBloomFilterSize() throws IOException {
|
||||
byte[] row1 = Bytes.toBytes("row1");
|
||||
byte[] fam1 = Bytes.toBytes("fam1");
|
||||
byte[] qf1 = Bytes.toBytes("col");
|
||||
byte[] val1 = Bytes.toBytes("value1");
|
||||
|
@ -3519,8 +3666,8 @@ public class TestHRegion {
|
|||
htd.addFamily(new HColumnDescriptor("cf"));
|
||||
info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.EMPTY_BYTE_ARRAY, false);
|
||||
Path path = new Path(DIR + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
|
||||
region = HRegion.newHRegion(path, null, fs, conf, info, htd, null);
|
||||
Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
|
||||
region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
|
||||
// region initialization throws IOException and set task state to ABORTED.
|
||||
region.initialize();
|
||||
fail("Region initialization should fail due to IOException");
|
||||
|
@ -3545,7 +3692,7 @@ public class TestHRegion {
|
|||
*/
|
||||
@Test
|
||||
public void testRegionInfoFileCreation() throws IOException {
|
||||
Path rootDir = new Path(DIR + "testRegionInfoFileCreation");
|
||||
Path rootDir = new Path(dir + "testRegionInfoFileCreation");
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testtb"));
|
||||
htd.addFamily(new HColumnDescriptor("cf"));
|
||||
|
@ -3553,7 +3700,7 @@ public class TestHRegion {
|
|||
HRegionInfo hri = new HRegionInfo(htd.getTableName());
|
||||
|
||||
// Create a region and skip the initialization (like CreateTableHandler)
|
||||
HRegion region = HRegion.createHRegion(hri, rootDir, conf, htd, null, false, true);
|
||||
HRegion region = HRegion.createHRegion(hri, rootDir, CONF, htd, null, false, true);
|
||||
// HRegion region = TEST_UTIL.createLocalHRegion(hri, htd);
|
||||
Path regionDir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
@ -3566,7 +3713,7 @@ public class TestHRegion {
|
|||
fs.exists(regionInfoFile));
|
||||
|
||||
// Try to open the region
|
||||
region = HRegion.openHRegion(rootDir, hri, htd, null, conf);
|
||||
region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
|
||||
assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
|
||||
HRegion.closeHRegion(region);
|
||||
|
||||
|
@ -3579,7 +3726,7 @@ public class TestHRegion {
|
|||
assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
|
||||
fs.exists(regionInfoFile));
|
||||
|
||||
region = HRegion.openHRegion(rootDir, hri, htd, null, conf);
|
||||
region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
|
||||
// region = TEST_UTIL.openHRegion(hri, htd);
|
||||
assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
|
||||
HRegion.closeHRegion(region);
|
||||
|
@ -3629,7 +3776,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testParallelIncrementWithMemStoreFlush() throws Exception {
|
||||
byte[] family = Incrementer.family;
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
final HRegion region = this.region;
|
||||
final AtomicBoolean incrementDone = new AtomicBoolean(false);
|
||||
Runnable flusher = new Runnable() {
|
||||
|
@ -3716,7 +3863,7 @@ public class TestHRegion {
|
|||
@Test
|
||||
public void testParallelAppendWithMemStoreFlush() throws Exception {
|
||||
byte[] family = Appender.family;
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
final HRegion region = this.region;
|
||||
final AtomicBoolean appendDone = new AtomicBoolean(false);
|
||||
Runnable flusher = new Runnable() {
|
||||
|
@ -3780,7 +3927,7 @@ public class TestHRegion {
|
|||
byte[] qualifier = Bytes.toBytes("qualifier");
|
||||
byte[] row = Bytes.toBytes("putRow");
|
||||
byte[] value = null;
|
||||
this.region = initHRegion(tableName, method, conf, family);
|
||||
this.region = initHRegion(tableName, method, CONF, family);
|
||||
Put put = null;
|
||||
Get get = null;
|
||||
List<Cell> kvs = null;
|
||||
|
@ -3883,11 +4030,12 @@ public class TestHRegion {
|
|||
private void durabilityTest(String method, Durability tableDurability,
|
||||
Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
|
||||
final boolean expectSyncFromLogSyncer) throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create(CONF);
|
||||
method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
|
||||
TableName tableName = TableName.valueOf(method);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
Path logDir = new Path(new Path(DIR + method), "log");
|
||||
HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf);
|
||||
Path logDir = new Path(new Path(dir + method), "log");
|
||||
HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), conf);
|
||||
final HLog log = spy(hlog);
|
||||
this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, log,
|
||||
|
@ -3924,8 +4072,8 @@ public class TestHRegion {
|
|||
verify(log, never()).sync();
|
||||
}
|
||||
|
||||
hlog.close();
|
||||
region.close();
|
||||
HRegion.closeHRegion(this.region);
|
||||
this.region = null;
|
||||
}
|
||||
|
||||
private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
|
||||
|
@ -4010,20 +4158,20 @@ public class TestHRegion {
|
|||
|
||||
private Configuration initSplit() {
|
||||
// Always compact if there is more than one store file.
|
||||
conf.setInt("hbase.hstore.compactionThreshold", 2);
|
||||
CONF.setInt("hbase.hstore.compactionThreshold", 2);
|
||||
|
||||
// Make lease timeout longer, lease checks less frequent
|
||||
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
|
||||
CONF.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
|
||||
|
||||
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
|
||||
CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
|
||||
|
||||
// Increase the amount of time between client retries
|
||||
conf.setLong("hbase.client.pause", 15 * 1000);
|
||||
CONF.setLong("hbase.client.pause", 15 * 1000);
|
||||
|
||||
// This size should make it so we always split using the addContent
|
||||
// below. After adding all data, the first region is 1.3M
|
||||
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
|
||||
return conf;
|
||||
CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
|
||||
return CONF;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,7 +42,7 @@ public class TestHRegionBusyWait extends TestHRegion {
|
|||
@Before
|
||||
public void setup() throws IOException {
|
||||
super.setup();
|
||||
conf.set("hbase.busy.wait.duration", "1000");
|
||||
CONF.set("hbase.busy.wait.duration", "1000");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -53,7 +53,7 @@ public class TestHRegionBusyWait extends TestHRegion {
|
|||
String method = "testRegionTooBusy";
|
||||
byte[] tableName = Bytes.toBytes(method);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
region = initHRegion(tableName, method, conf, family);
|
||||
region = initHRegion(tableName, method, CONF, family);
|
||||
final AtomicBoolean stopped = new AtomicBoolean(true);
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
|
|
|
@ -29,8 +29,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -75,15 +74,22 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Test class for the Store
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestStore extends TestCase {
|
||||
public class TestStore {
|
||||
public static final Log LOG = LogFactory.getLog(TestStore.class);
|
||||
@Rule public TestName name = new TestName();
|
||||
|
||||
HStore store;
|
||||
byte [] table = Bytes.toBytes("table");
|
||||
|
@ -115,7 +121,7 @@ public class TestStore extends TestCase {
|
|||
* Setup
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
qualifiers.add(qf1);
|
||||
qualifiers.add(qf3);
|
||||
|
@ -149,7 +155,7 @@ public class TestStore extends TestCase {
|
|||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void init(String methodName, Configuration conf, HTableDescriptor htd,
|
||||
private Store init(String methodName, Configuration conf, HTableDescriptor htd,
|
||||
HColumnDescriptor hcd) throws IOException {
|
||||
//Setting up a Store
|
||||
Path basedir = new Path(DIR+methodName);
|
||||
|
@ -167,12 +173,73 @@ public class TestStore extends TestCase {
|
|||
HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
|
||||
|
||||
store = new HStore(region, hcd, conf);
|
||||
return store;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test we do not lose data if we fail a flush and then close.
|
||||
* Part of HBase-10466
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testFlushSizeAccounting() throws Exception {
|
||||
LOG.info("Setting up a faulty file system that cannot write in " +
|
||||
this.name.getMethodName());
|
||||
final Configuration conf = HBaseConfiguration.create();
|
||||
// Only retry once.
|
||||
conf.setInt("hbase.hstore.flush.retries.number", 1);
|
||||
User user = User.createUserForTesting(conf, this.name.getMethodName(),
|
||||
new String[]{"foo"});
|
||||
// Inject our faulty LocalFileSystem
|
||||
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
|
||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||
public Object run() throws Exception {
|
||||
// Make sure it worked (above is sensitive to caching details in hadoop core)
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
FaultyFileSystem ffs = (FaultyFileSystem)fs;
|
||||
|
||||
// Initialize region
|
||||
init(name.getMethodName(), conf);
|
||||
|
||||
long size = store.memstore.getFlushableSize();
|
||||
Assert.assertEquals(0, size);
|
||||
LOG.info("Adding some data");
|
||||
long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
size = store.memstore.getFlushableSize();
|
||||
Assert.assertEquals(kvSize, size);
|
||||
// Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right.
|
||||
try {
|
||||
LOG.info("Flushing");
|
||||
flushStore(store, id++);
|
||||
Assert.fail("Didn't bubble up IOE!");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
|
||||
}
|
||||
size = store.memstore.getFlushableSize();
|
||||
Assert.assertEquals(kvSize, size);
|
||||
store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
|
||||
// Even though we add a new kv, we expect the flushable size to be 'same' since we have
|
||||
// not yet cleared the snapshot -- the above flush failed.
|
||||
Assert.assertEquals(kvSize, size);
|
||||
ffs.fault.set(false);
|
||||
flushStore(store, id++);
|
||||
size = store.memstore.getFlushableSize();
|
||||
// Size should be the foreground kv size.
|
||||
Assert.assertEquals(kvSize, size);
|
||||
flushStore(store, id++);
|
||||
size = store.memstore.getFlushableSize();
|
||||
Assert.assertEquals(0, size);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that compression and data block encoding are respected by the
|
||||
* Store.createWriterInTmp() method, used on store flush.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateWriter() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
|
@ -180,7 +247,7 @@ public class TestStore extends TestCase {
|
|||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setCompressionType(Compression.Algorithm.GZ);
|
||||
hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
|
||||
init(getName(), conf, hcd);
|
||||
init(name.getMethodName(), conf, hcd);
|
||||
|
||||
// Test createWriterInTmp()
|
||||
StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
|
||||
|
@ -193,11 +260,12 @@ public class TestStore extends TestCase {
|
|||
|
||||
// Verify that compression and encoding settings are respected
|
||||
HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
|
||||
assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
|
||||
assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
|
||||
Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
|
||||
Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
|
||||
reader.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteExpiredStoreFiles() throws Exception {
|
||||
int storeFileNum = 4;
|
||||
int ttl = 4;
|
||||
|
@ -209,7 +277,7 @@ public class TestStore extends TestCase {
|
|||
conf.setBoolean("hbase.store.delete.expired.storefile", true);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setTimeToLive(ttl);
|
||||
init(getName(), conf, hcd);
|
||||
init(name.getMethodName(), conf, hcd);
|
||||
|
||||
long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
|
||||
long timeStamp;
|
||||
|
@ -226,7 +294,7 @@ public class TestStore extends TestCase {
|
|||
}
|
||||
|
||||
// Verify the total number of store files
|
||||
assertEquals(storeFileNum, this.store.getStorefiles().size());
|
||||
Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
|
||||
|
||||
// Each compaction request will find one expired store file and delete it
|
||||
// by the compaction.
|
||||
|
@ -237,27 +305,28 @@ public class TestStore extends TestCase {
|
|||
// the first is expired normally.
|
||||
// If not the first compaction, there is another empty store file,
|
||||
List<StoreFile> files = new ArrayList<StoreFile>(cr.getFiles());
|
||||
assertEquals(Math.min(i, 2), cr.getFiles().size());
|
||||
Assert.assertEquals(Math.min(i, 2), cr.getFiles().size());
|
||||
for (int j = 0; j < files.size(); j++) {
|
||||
assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
|
||||
Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge
|
||||
.currentTimeMillis() - this.store.getScanInfo().getTtl()));
|
||||
}
|
||||
// Verify that the expired store file is compacted to an empty store file.
|
||||
// Default compaction policy creates just one and only one compacted file.
|
||||
StoreFile compactedFile = this.store.compact(compaction).get(0);
|
||||
// It is an empty store file.
|
||||
assertEquals(0, compactedFile.getReader().getEntries());
|
||||
Assert.assertEquals(0, compactedFile.getReader().getEntries());
|
||||
|
||||
// Let the next store file expired.
|
||||
edge.incrementTime(sleepTime);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLowestModificationTime() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
// Initialize region
|
||||
init(getName(), conf);
|
||||
init(name.getMethodName(), conf);
|
||||
|
||||
int storeFileNum = 4;
|
||||
for (int i = 1; i <= storeFileNum; i++) {
|
||||
|
@ -270,13 +339,13 @@ public class TestStore extends TestCase {
|
|||
// after flush; check the lowest time stamp
|
||||
long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
|
||||
long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
|
||||
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
|
||||
|
||||
// after compact; check the lowest time stamp
|
||||
store.compact(store.requestCompaction());
|
||||
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
|
||||
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
|
||||
}
|
||||
|
||||
private static long getLowestTimeStampFromFS(FileSystem fs,
|
||||
|
@ -311,8 +380,9 @@ public class TestStore extends TestCase {
|
|||
* Test for hbase-1686.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyStoreFile() throws IOException {
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
// Write a store file.
|
||||
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
|
||||
|
@ -335,20 +405,21 @@ public class TestStore extends TestCase {
|
|||
this.store.close();
|
||||
// Reopen it... should pick up two files
|
||||
this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
|
||||
assertEquals(2, this.store.getStorefilesCount());
|
||||
Assert.assertEquals(2, this.store.getStorefilesCount());
|
||||
|
||||
result = HBaseTestingUtility.getFromStoreFile(store,
|
||||
get.getRow(),
|
||||
qualifiers);
|
||||
assertEquals(1, result.size());
|
||||
Assert.assertEquals(1, result.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Getting data from memstore only
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testGet_FromMemStoreOnly() throws IOException {
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
//Put data in memstore
|
||||
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
|
@ -370,8 +441,9 @@ public class TestStore extends TestCase {
|
|||
* Getting data from files only
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testGet_FromFilesOnly() throws IOException {
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
//Put data in memstore
|
||||
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
|
@ -408,8 +480,9 @@ public class TestStore extends TestCase {
|
|||
* Getting data from memstore and files
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testGet_FromMemStoreAndFiles() throws IOException {
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
//Put data in memstore
|
||||
this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
|
@ -441,14 +514,14 @@ public class TestStore extends TestCase {
|
|||
private void flush(int storeFilessize) throws IOException{
|
||||
this.store.snapshot();
|
||||
flushStore(store, id++);
|
||||
assertEquals(storeFilessize, this.store.getStorefiles().size());
|
||||
assertEquals(0, this.store.memstore.kvset.size());
|
||||
Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
|
||||
Assert.assertEquals(0, this.store.memstore.kvset.size());
|
||||
}
|
||||
|
||||
private void assertCheck() {
|
||||
assertEquals(expected.size(), result.size());
|
||||
Assert.assertEquals(expected.size(), result.size());
|
||||
for(int i=0; i<expected.size(); i++) {
|
||||
assertEquals(expected.get(i), result.get(i));
|
||||
Assert.assertEquals(expected.get(i), result.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -458,9 +531,10 @@ public class TestStore extends TestCase {
|
|||
/*
|
||||
* test the internal details of how ICV works, especially during a flush scenario.
|
||||
*/
|
||||
@Test
|
||||
public void testIncrementColumnValue_ICVDuringFlush()
|
||||
throws IOException, InterruptedException {
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
long oldValue = 1L;
|
||||
long newValue = 3L;
|
||||
|
@ -480,13 +554,13 @@ public class TestStore extends TestCase {
|
|||
long ret = this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
// memstore should have grown by some amount.
|
||||
assertTrue(ret > 0);
|
||||
Assert.assertTrue(ret > 0);
|
||||
|
||||
// then flush.
|
||||
flushStore(store, id++);
|
||||
assertEquals(1, this.store.getStorefiles().size());
|
||||
Assert.assertEquals(1, this.store.getStorefiles().size());
|
||||
// from the one we inserted up there, and a new one
|
||||
assertEquals(2, this.store.memstore.kvset.size());
|
||||
Assert.assertEquals(2, this.store.memstore.kvset.size());
|
||||
|
||||
// how many key/values for this row are there?
|
||||
Get get = new Get(row);
|
||||
|
@ -495,25 +569,25 @@ public class TestStore extends TestCase {
|
|||
List<Cell> results = new ArrayList<Cell>();
|
||||
|
||||
results = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertEquals(2, results.size());
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
long ts1 = results.get(0).getTimestamp();
|
||||
long ts2 = results.get(1).getTimestamp();
|
||||
|
||||
assertTrue(ts1 > ts2);
|
||||
Assert.assertTrue(ts1 > ts2);
|
||||
|
||||
assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
|
||||
assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
|
||||
Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
|
||||
Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
EnvironmentEdgeManagerTestHelper.reset();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testICV_negMemstoreSize() throws IOException {
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
long time = 100;
|
||||
ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
|
||||
|
@ -549,9 +623,9 @@ public class TestStore extends TestCase {
|
|||
if (ret != 0) System.out.println("ret: " + ret);
|
||||
if (ret2 != 0) System.out.println("ret2: " + ret2);
|
||||
|
||||
assertTrue("ret: " + ret, ret >= 0);
|
||||
Assert.assertTrue("ret: " + ret, ret >= 0);
|
||||
size += ret;
|
||||
assertTrue("ret2: " + ret2, ret2 >= 0);
|
||||
Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
|
||||
size += ret2;
|
||||
|
||||
|
||||
|
@ -565,13 +639,14 @@ public class TestStore extends TestCase {
|
|||
//System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
|
||||
computedSize += kvsize;
|
||||
}
|
||||
assertEquals(computedSize, size);
|
||||
Assert.assertEquals(computedSize, size);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
|
||||
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(mee);
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
long oldValue = 1L;
|
||||
long newValue = 3L;
|
||||
|
@ -586,12 +661,12 @@ public class TestStore extends TestCase {
|
|||
long ret = this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
// memstore should have grown by some amount.
|
||||
assertTrue(ret > 0);
|
||||
Assert.assertTrue(ret > 0);
|
||||
|
||||
// then flush.
|
||||
flushStore(store, id++);
|
||||
assertEquals(1, this.store.getStorefiles().size());
|
||||
assertEquals(1, this.store.memstore.kvset.size());
|
||||
Assert.assertEquals(1, this.store.getStorefiles().size());
|
||||
Assert.assertEquals(1, this.store.memstore.kvset.size());
|
||||
|
||||
// now increment again:
|
||||
newValue += 1;
|
||||
|
@ -611,30 +686,31 @@ public class TestStore extends TestCase {
|
|||
List<Cell> results = new ArrayList<Cell>();
|
||||
|
||||
results = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertEquals(2, results.size());
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
long ts1 = results.get(0).getTimestamp();
|
||||
long ts2 = results.get(1).getTimestamp();
|
||||
|
||||
assertTrue(ts1 > ts2);
|
||||
assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
|
||||
assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
|
||||
Assert.assertTrue(ts1 > ts2);
|
||||
Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
|
||||
Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
|
||||
|
||||
mee.setValue(2); // time goes up slightly
|
||||
newValue += 1;
|
||||
this.store.updateColumnValue(row, family, qf1, newValue);
|
||||
|
||||
results = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertEquals(2, results.size());
|
||||
Assert.assertEquals(2, results.size());
|
||||
|
||||
ts1 = results.get(0).getTimestamp();
|
||||
ts2 = results.get(1).getTimestamp();
|
||||
|
||||
assertTrue(ts1 > ts2);
|
||||
assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
|
||||
assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
|
||||
Assert.assertTrue(ts1 > ts2);
|
||||
Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
|
||||
Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleErrorsInFlush() throws Exception {
|
||||
LOG.info("Setting up a faulty file system that cannot write");
|
||||
|
||||
|
@ -648,10 +724,10 @@ public class TestStore extends TestCase {
|
|||
public Object run() throws Exception {
|
||||
// Make sure it worked (above is sensitive to caching details in hadoop core)
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
|
||||
|
||||
// Initialize region
|
||||
init(getName(), conf);
|
||||
init(name.getMethodName(), conf);
|
||||
|
||||
LOG.info("Adding some data");
|
||||
store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
|
||||
|
@ -662,30 +738,36 @@ public class TestStore extends TestCase {
|
|||
|
||||
Collection<StoreFileInfo> files =
|
||||
store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
|
||||
assertEquals(0, files != null ? files.size() : 0);
|
||||
Assert.assertEquals(0, files != null ? files.size() : 0);
|
||||
|
||||
//flush
|
||||
try {
|
||||
LOG.info("Flushing");
|
||||
flush(1);
|
||||
fail("Didn't bubble up IOE!");
|
||||
Assert.fail("Didn't bubble up IOE!");
|
||||
} catch (IOException ioe) {
|
||||
assertTrue(ioe.getMessage().contains("Fault injected"));
|
||||
Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
|
||||
}
|
||||
|
||||
LOG.info("After failed flush, we should still have no files!");
|
||||
files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
|
||||
assertEquals(0, files != null ? files.size() : 0);
|
||||
Assert.assertEquals(0, files != null ? files.size() : 0);
|
||||
store.getHRegion().getLog().closeAndDelete();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
FileSystem.closeAllForUGI(user.getUGI());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Faulty file system that will fail if you write past its fault position the FIRST TIME
|
||||
* only; thereafter it will succeed. Used by {@link TestHRegion} too.
|
||||
*/
|
||||
static class FaultyFileSystem extends FilterFileSystem {
|
||||
List<SoftReference<FaultyOutputStream>> outStreams =
|
||||
new ArrayList<SoftReference<FaultyOutputStream>>();
|
||||
private long faultPos = 200;
|
||||
AtomicBoolean fault = new AtomicBoolean(true);
|
||||
|
||||
public FaultyFileSystem() {
|
||||
super(new LocalFileSystem());
|
||||
|
@ -694,7 +776,7 @@ public class TestStore extends TestCase {
|
|||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path p) throws IOException {
|
||||
return new FaultyOutputStream(super.create(p), faultPos);
|
||||
return new FaultyOutputStream(super.create(p), faultPos, fault);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -702,7 +784,7 @@ public class TestStore extends TestCase {
|
|||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
return new FaultyOutputStream(super.create(f, permission,
|
||||
overwrite, bufferSize, replication, blockSize, progress), faultPos);
|
||||
overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
|
||||
}
|
||||
|
||||
public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
|
||||
|
@ -716,11 +798,13 @@ public class TestStore extends TestCase {
|
|||
|
||||
static class FaultyOutputStream extends FSDataOutputStream {
|
||||
volatile long faultPos = Long.MAX_VALUE;
|
||||
private final AtomicBoolean fault;
|
||||
|
||||
public FaultyOutputStream(FSDataOutputStream out,
|
||||
long faultPos) throws IOException {
|
||||
public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
|
||||
throws IOException {
|
||||
super(out, null);
|
||||
this.faultPos = faultPos;
|
||||
this.fault = fault;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -731,14 +815,12 @@ public class TestStore extends TestCase {
|
|||
}
|
||||
|
||||
private void injectFault() throws IOException {
|
||||
if (getPos() >= faultPos) {
|
||||
if (this.fault.get() && getPos() >= faultPos) {
|
||||
throw new IOException("Fault injected");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static void flushStore(HStore store, long id) throws IOException {
|
||||
StoreFlushContext storeFlushCtx = store.createFlushContext(id);
|
||||
storeFlushCtx.prepare();
|
||||
|
@ -746,8 +828,6 @@ public class TestStore extends TestCase {
|
|||
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Generate a list of KeyValues for testing based on given parameters
|
||||
* @param timestamps
|
||||
|
@ -772,12 +852,13 @@ public class TestStore extends TestCase {
|
|||
* Test to ensure correctness when using Stores with multiple timestamps
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleTimestamps() throws IOException {
|
||||
int numRows = 1;
|
||||
long[] timestamps1 = new long[] {1,5,10,20};
|
||||
long[] timestamps2 = new long[] {30,80};
|
||||
|
||||
init(this.getName());
|
||||
init(this.name.getMethodName());
|
||||
|
||||
List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
|
||||
for (Cell kv : kvList1) {
|
||||
|
@ -798,27 +879,27 @@ public class TestStore extends TestCase {
|
|||
|
||||
get.setTimeRange(0,15);
|
||||
result = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertTrue(result.size()>0);
|
||||
Assert.assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(40,90);
|
||||
result = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertTrue(result.size()>0);
|
||||
Assert.assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(10,45);
|
||||
result = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertTrue(result.size()>0);
|
||||
Assert.assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(80,145);
|
||||
result = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertTrue(result.size()>0);
|
||||
Assert.assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(1,2);
|
||||
result = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertTrue(result.size()>0);
|
||||
Assert.assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(90,200);
|
||||
result = HBaseTestingUtility.getFromStoreFile(store, get);
|
||||
assertTrue(result.size()==0);
|
||||
Assert.assertTrue(result.size()==0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -826,14 +907,16 @@ public class TestStore extends TestCase {
|
|||
*
|
||||
* @throws IOException When the IO operations fail.
|
||||
*/
|
||||
@Test
|
||||
public void testSplitWithEmptyColFam() throws IOException {
|
||||
init(this.getName());
|
||||
assertNull(store.getSplitPoint());
|
||||
init(this.name.getMethodName());
|
||||
Assert.assertNull(store.getSplitPoint());
|
||||
store.getHRegion().forceSplit(null);
|
||||
assertNull(store.getSplitPoint());
|
||||
Assert.assertNull(store.getSplitPoint());
|
||||
store.getHRegion().clearSplit_TESTS_ONLY();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
|
||||
final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
|
||||
long anyValue = 10;
|
||||
|
@ -843,25 +926,25 @@ public class TestStore extends TestCase {
|
|||
// a number we pass in is higher than some config value, inside compactionPolicy.
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setLong(CONFIG_KEY, anyValue);
|
||||
init(getName() + "-xml", conf);
|
||||
assertTrue(store.throttleCompaction(anyValue + 1));
|
||||
assertFalse(store.throttleCompaction(anyValue));
|
||||
init(name.getMethodName() + "-xml", conf);
|
||||
Assert.assertTrue(store.throttleCompaction(anyValue + 1));
|
||||
Assert.assertFalse(store.throttleCompaction(anyValue));
|
||||
|
||||
// HTD overrides XML.
|
||||
--anyValue;
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
|
||||
init(getName() + "-htd", conf, htd, hcd);
|
||||
assertTrue(store.throttleCompaction(anyValue + 1));
|
||||
assertFalse(store.throttleCompaction(anyValue));
|
||||
init(name.getMethodName() + "-htd", conf, htd, hcd);
|
||||
Assert.assertTrue(store.throttleCompaction(anyValue + 1));
|
||||
Assert.assertFalse(store.throttleCompaction(anyValue));
|
||||
|
||||
// HCD overrides them both.
|
||||
--anyValue;
|
||||
hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
|
||||
init(getName() + "-hcd", conf, htd, hcd);
|
||||
assertTrue(store.throttleCompaction(anyValue + 1));
|
||||
assertFalse(store.throttleCompaction(anyValue));
|
||||
init(name.getMethodName() + "-hcd", conf, htd, hcd);
|
||||
Assert.assertTrue(store.throttleCompaction(anyValue + 1));
|
||||
Assert.assertFalse(store.throttleCompaction(anyValue));
|
||||
}
|
||||
|
||||
public static class DummyStoreEngine extends DefaultStoreEngine {
|
||||
|
@ -874,11 +957,12 @@ public class TestStore extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreUsesSearchEngineOverride() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
|
||||
init(this.getName(), conf);
|
||||
assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor());
|
||||
init(this.name.getMethodName(), conf);
|
||||
Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
|
||||
this.store.storeEngine.getCompactor());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue