HBASE-11678 BucketCache ramCache fills heap after running a few hours
This commit is contained in:
parent
5848710aa7
commit
5e899c68e0
|
@ -73,7 +73,7 @@ public class HeapMemorySizeUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve global memstore configured size as percentage of total heap.
|
* Retrieve global memstore configured size as percentage of total heap.
|
||||||
* @param conf
|
* @param c
|
||||||
* @param logInvalid
|
* @param logInvalid
|
||||||
*/
|
*/
|
||||||
public static float getGlobalMemStorePercent(final Configuration c, final boolean logInvalid) {
|
public static float getGlobalMemStorePercent(final Configuration c, final boolean logInvalid) {
|
||||||
|
@ -91,7 +91,7 @@ public class HeapMemorySizeUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve configured size for global memstore lower water mark as percentage of total heap.
|
* Retrieve configured size for global memstore lower water mark as percentage of total heap.
|
||||||
* @param conf
|
* @param c
|
||||||
* @param globalMemStorePercent
|
* @param globalMemStorePercent
|
||||||
*/
|
*/
|
||||||
public static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
|
public static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
|
||||||
|
|
|
@ -503,12 +503,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
if(bytesToFree <= 0) return;
|
if(bytesToFree <= 0) return;
|
||||||
|
|
||||||
// Instantiate priority buckets
|
// Instantiate priority buckets
|
||||||
BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
|
BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
|
||||||
singleSize());
|
BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
|
||||||
BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
|
BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
|
||||||
multiSize());
|
|
||||||
BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
|
|
||||||
memorySize());
|
|
||||||
|
|
||||||
// Scan entire map putting into appropriate buckets
|
// Scan entire map putting into appropriate buckets
|
||||||
for(LruCachedBlock cachedBlock : map.values()) {
|
for(LruCachedBlock cachedBlock : map.values()) {
|
||||||
|
@ -606,7 +603,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* to configuration parameters and their relatives sizes.
|
* to configuration parameters and their relatives sizes.
|
||||||
*/
|
*/
|
||||||
private class BlockBucket implements Comparable<BlockBucket> {
|
private class BlockBucket implements Comparable<BlockBucket> {
|
||||||
|
|
||||||
private LruCachedBlockQueue queue;
|
private LruCachedBlockQueue queue;
|
||||||
private long totalSize = 0;
|
private long totalSize = 0;
|
||||||
private long bucketSize;
|
private long bucketSize;
|
||||||
|
@ -652,10 +648,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
if (that == null || !(that instanceof BlockBucket)){
|
if (that == null || !(that instanceof BlockBucket)){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
return compareTo((BlockBucket)that) == 0;
|
||||||
return compareTo(( BlockBucket)that) == 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
// Nothing distingushing about each instance unless I pass in a 'name' or something
|
||||||
|
return super.hashCode();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -714,18 +714,20 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
while (this.go) {
|
while (this.go) {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
try {
|
try {
|
||||||
this.wait();
|
this.wait(1000 * 10/*Don't wait for ever*/);
|
||||||
} catch(InterruptedException e) {}
|
} catch(InterruptedException e) {}
|
||||||
}
|
}
|
||||||
LruBlockCache cache = this.cache.get();
|
LruBlockCache cache = this.cache.get();
|
||||||
if(cache == null) break;
|
if (cache == null) break;
|
||||||
cache.evict();
|
cache.evict();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||||
|
justification="This is what we want")
|
||||||
public void evict() {
|
public void evict() {
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
this.notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
this.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -872,6 +874,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
return (int)(other.getCachedTime() - this.getCachedTime());
|
return (int)(other.getCachedTime() - this.getCachedTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return b.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (obj instanceof CachedBlock) {
|
if (obj instanceof CachedBlock) {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
|
||||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
@ -415,7 +416,9 @@ public final class BucketAllocator {
|
||||||
assert blockSize > 0;
|
assert blockSize > 0;
|
||||||
BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
|
BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
|
||||||
if (bsi == null) {
|
if (bsi == null) {
|
||||||
throw new BucketAllocatorException("Allocation too big size=" + blockSize);
|
throw new BucketAllocatorException("Allocation too big size=" + blockSize +
|
||||||
|
"; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
|
||||||
|
" to accomodate if size seems reasonable and you want it cached.");
|
||||||
}
|
}
|
||||||
long offset = bsi.allocateBlock();
|
long offset = bsi.allocateBlock();
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.HasThread;
|
||||||
import org.apache.hadoop.hbase.util.IdLock;
|
import org.apache.hadoop.hbase.util.IdLock;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -112,7 +113,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
IOEngine ioEngine;
|
IOEngine ioEngine;
|
||||||
|
|
||||||
// Store the block in this map before writing it to cache
|
// Store the block in this map before writing it to cache
|
||||||
private Map<BlockCacheKey, RAMQueueEntry> ramCache;
|
@VisibleForTesting
|
||||||
|
Map<BlockCacheKey, RAMQueueEntry> ramCache;
|
||||||
// In this map, store the block's meta data like offset, length
|
// In this map, store the block's meta data like offset, length
|
||||||
private Map<BlockCacheKey, BucketEntry> backingMap;
|
private Map<BlockCacheKey, BucketEntry> backingMap;
|
||||||
|
|
||||||
|
@ -123,8 +125,17 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
*/
|
*/
|
||||||
private volatile boolean cacheEnabled;
|
private volatile boolean cacheEnabled;
|
||||||
|
|
||||||
private ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
|
/**
|
||||||
|
* A list of writer queues. We have a queue per {@link WriterThread} we have running.
|
||||||
|
* In other words, the work adding blocks to the BucketCache is divided up amongst the
|
||||||
|
* running WriterThreads. Its done by taking hash of the cache key modulo queue count.
|
||||||
|
* WriterThread when it runs takes whatever has been recently added and 'drains' the entries
|
||||||
|
* to the BucketCache. It then updates the ramCache and backingMap accordingly.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
|
||||||
new ArrayList<BlockingQueue<RAMQueueEntry>>();
|
new ArrayList<BlockingQueue<RAMQueueEntry>>();
|
||||||
|
@VisibleForTesting
|
||||||
WriterThread writerThreads[];
|
WriterThread writerThreads[];
|
||||||
|
|
||||||
/** Volatile boolean to track if free space is in process or not */
|
/** Volatile boolean to track if free space is in process or not */
|
||||||
|
@ -161,6 +172,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
private final int ioErrorsTolerationDuration;
|
private final int ioErrorsTolerationDuration;
|
||||||
// 1 min
|
// 1 min
|
||||||
public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
|
public static final int DEFAULT_ERROR_TOLERATION_DURATION = 60 * 1000;
|
||||||
|
|
||||||
// Start time of first IO error when reading or writing IO Engine, it will be
|
// Start time of first IO error when reading or writing IO Engine, it will be
|
||||||
// reset after a successful read/write.
|
// reset after a successful read/write.
|
||||||
private volatile long ioErrorStartTime = -1;
|
private volatile long ioErrorStartTime = -1;
|
||||||
|
@ -245,6 +257,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
for (int i = 0; i < writerThreads.length; ++i) {
|
for (int i = 0; i < writerThreads.length; ++i) {
|
||||||
writerThreads[i] = new WriterThread(writerQueues.get(i), i);
|
writerThreads[i] = new WriterThread(writerQueues.get(i), i);
|
||||||
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
|
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
|
||||||
|
writerThreads[i].setDaemon(true);
|
||||||
writerThreads[i].start();
|
writerThreads[i].start();
|
||||||
}
|
}
|
||||||
// Run the statistics thread periodically to print the cache statistics log
|
// Run the statistics thread periodically to print the cache statistics log
|
||||||
|
@ -259,6 +272,11 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
persistencePath + ", bucketAllocator=" + this.bucketAllocator);
|
persistencePath + ", bucketAllocator=" + this.bucketAllocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isCacheEnabled() {
|
||||||
|
return this.cacheEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
public long getMaxSize() {
|
public long getMaxSize() {
|
||||||
return this.cacheCapacity;
|
return this.cacheCapacity;
|
||||||
}
|
}
|
||||||
|
@ -526,44 +544,45 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
* Free the space if the used size reaches acceptableSize() or one size block
|
* Free the space if the used size reaches acceptableSize() or one size block
|
||||||
* couldn't be allocated. When freeing the space, we use the LRU algorithm and
|
* couldn't be allocated. When freeing the space, we use the LRU algorithm and
|
||||||
* ensure there must be some blocks evicted
|
* ensure there must be some blocks evicted
|
||||||
|
* @param why Why we are being called
|
||||||
*/
|
*/
|
||||||
private void freeSpace() {
|
private void freeSpace(final String why) {
|
||||||
// Ensure only one freeSpace progress at a time
|
// Ensure only one freeSpace progress at a time
|
||||||
if (!freeSpaceLock.tryLock()) return;
|
if (!freeSpaceLock.tryLock()) return;
|
||||||
try {
|
try {
|
||||||
freeInProgress = true;
|
freeInProgress = true;
|
||||||
long bytesToFreeWithoutExtra = 0;
|
long bytesToFreeWithoutExtra = 0;
|
||||||
/*
|
// Calculate free byte for each bucketSizeinfo
|
||||||
* Calculate free byte for each bucketSizeinfo
|
StringBuffer msgBuffer = LOG.isDebugEnabled()? new StringBuffer(): null;
|
||||||
*/
|
|
||||||
StringBuffer msgBuffer = new StringBuffer();
|
|
||||||
BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
|
BucketAllocator.IndexStatistics[] stats = bucketAllocator.getIndexStatistics();
|
||||||
long[] bytesToFreeForBucket = new long[stats.length];
|
long[] bytesToFreeForBucket = new long[stats.length];
|
||||||
for (int i = 0; i < stats.length; i++) {
|
for (int i = 0; i < stats.length; i++) {
|
||||||
bytesToFreeForBucket[i] = 0;
|
bytesToFreeForBucket[i] = 0;
|
||||||
long freeGoal = (long) Math.floor(stats[i].totalCount()
|
long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
|
||||||
* (1 - DEFAULT_MIN_FACTOR));
|
|
||||||
freeGoal = Math.max(freeGoal, 1);
|
freeGoal = Math.max(freeGoal, 1);
|
||||||
if (stats[i].freeCount() < freeGoal) {
|
if (stats[i].freeCount() < freeGoal) {
|
||||||
bytesToFreeForBucket[i] = stats[i].itemSize()
|
bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal - stats[i].freeCount());
|
||||||
* (freeGoal - stats[i].freeCount());
|
|
||||||
bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
|
bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
|
||||||
|
if (msgBuffer != null) {
|
||||||
msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
|
msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
|
||||||
+ StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
|
+ StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
msgBuffer.append("Free for total="
|
}
|
||||||
+ StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
|
if (msgBuffer != null) {
|
||||||
|
msgBuffer.append("Free for total=" + StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
|
||||||
|
}
|
||||||
|
|
||||||
if (bytesToFreeWithoutExtra <= 0) {
|
if (bytesToFreeWithoutExtra <= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
long currentSize = bucketAllocator.getUsedSize();
|
long currentSize = bucketAllocator.getUsedSize();
|
||||||
long totalSize=bucketAllocator.getTotalSize();
|
long totalSize=bucketAllocator.getTotalSize();
|
||||||
LOG.debug("Bucket cache free space started; Attempting to " + msgBuffer.toString()
|
if (LOG.isDebugEnabled() && msgBuffer != null) {
|
||||||
+ " of current used=" + StringUtils.byteDesc(currentSize)
|
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
|
||||||
+ ",actual cacheSize=" + StringUtils.byteDesc(realCacheSize.get())
|
" of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
|
||||||
+ ",total=" + StringUtils.byteDesc(totalSize));
|
StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize));
|
||||||
|
}
|
||||||
|
|
||||||
long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
|
long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
|
||||||
* (1 + DEFAULT_EXTRA_FREE_FACTOR));
|
* (1 + DEFAULT_EXTRA_FREE_FACTOR));
|
||||||
|
@ -622,8 +641,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
stats = bucketAllocator.getIndexStatistics();
|
stats = bucketAllocator.getIndexStatistics();
|
||||||
boolean needFreeForExtra = false;
|
boolean needFreeForExtra = false;
|
||||||
for (int i = 0; i < stats.length; i++) {
|
for (int i = 0; i < stats.length; i++) {
|
||||||
long freeGoal = (long) Math.floor(stats[i].totalCount()
|
long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 - DEFAULT_MIN_FACTOR));
|
||||||
* (1 - DEFAULT_MIN_FACTOR));
|
|
||||||
freeGoal = Math.max(freeGoal, 1);
|
freeGoal = Math.max(freeGoal, 1);
|
||||||
if (stats[i].freeCount() < freeGoal) {
|
if (stats[i].freeCount() < freeGoal) {
|
||||||
needFreeForExtra = true;
|
needFreeForExtra = true;
|
||||||
|
@ -639,8 +657,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
bucketQueue.add(bucketMulti);
|
bucketQueue.add(bucketMulti);
|
||||||
|
|
||||||
while ((bucketGroup = bucketQueue.poll()) != null) {
|
while ((bucketGroup = bucketQueue.poll()) != null) {
|
||||||
long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed)
|
long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) / remainingBuckets;
|
||||||
/ remainingBuckets;
|
|
||||||
bytesFreed += bucketGroup.free(bucketBytesToFree);
|
bytesFreed += bucketGroup.free(bucketBytesToFree);
|
||||||
remainingBuckets--;
|
remainingBuckets--;
|
||||||
}
|
}
|
||||||
|
@ -650,6 +667,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
long single = bucketSingle.totalSize();
|
long single = bucketSingle.totalSize();
|
||||||
long multi = bucketMulti.totalSize();
|
long multi = bucketMulti.totalSize();
|
||||||
long memory = bucketMemory.totalSize();
|
long memory = bucketMemory.totalSize();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Bucket cache free space completed; " + "freed="
|
LOG.debug("Bucket cache free space completed; " + "freed="
|
||||||
+ StringUtils.byteDesc(bytesFreed) + ", " + "total="
|
+ StringUtils.byteDesc(bytesFreed) + ", " + "total="
|
||||||
+ StringUtils.byteDesc(totalSize) + ", " + "single="
|
+ StringUtils.byteDesc(totalSize) + ", " + "single="
|
||||||
|
@ -657,6 +675,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
+ StringUtils.byteDesc(multi) + ", " + "memory="
|
+ StringUtils.byteDesc(multi) + ", " + "memory="
|
||||||
+ StringUtils.byteDesc(memory));
|
+ StringUtils.byteDesc(memory));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Failed freeing space", t);
|
LOG.warn("Failed freeing space", t);
|
||||||
|
@ -668,19 +687,20 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
// This handles flushing the RAM cache to IOEngine.
|
// This handles flushing the RAM cache to IOEngine.
|
||||||
private class WriterThread extends HasThread {
|
@VisibleForTesting
|
||||||
BlockingQueue<RAMQueueEntry> inputQueue;
|
class WriterThread extends HasThread {
|
||||||
final int threadNO;
|
private final BlockingQueue<RAMQueueEntry> inputQueue;
|
||||||
boolean writerEnabled = true;
|
private final int threadNO;
|
||||||
|
private volatile boolean writerEnabled = true;
|
||||||
|
|
||||||
WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
|
WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
|
||||||
super();
|
super();
|
||||||
this.inputQueue = queue;
|
this.inputQueue = queue;
|
||||||
this.threadNO = threadNO;
|
this.threadNO = threadNO;
|
||||||
setDaemon(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Used for test
|
// Used for test
|
||||||
|
@VisibleForTesting
|
||||||
void disableWriter() {
|
void disableWriter() {
|
||||||
this.writerEnabled = false;
|
this.writerEnabled = false;
|
||||||
}
|
}
|
||||||
|
@ -692,8 +712,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
// Blocks
|
// Blocks
|
||||||
entries.add(inputQueue.take());
|
entries = getRAMQueueEntries(inputQueue, entries);
|
||||||
inputQueue.drainTo(entries);
|
|
||||||
synchronized (cacheWaitSignals[threadNO]) {
|
synchronized (cacheWaitSignals[threadNO]) {
|
||||||
cacheWaitSignals[threadNO].notifyAll();
|
cacheWaitSignals[threadNO].notifyAll();
|
||||||
}
|
}
|
||||||
|
@ -712,80 +731,120 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush the entries in ramCache to IOEngine and add bucket entry to
|
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
||||||
* backingMap
|
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
|
||||||
* @param entries
|
* never undo the references and we'll OOME.
|
||||||
|
* @param entries Presumes list passed in here will be processed by this invocation only. No
|
||||||
|
* interference expected.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
private void doDrain(List<RAMQueueEntry> entries)
|
@VisibleForTesting
|
||||||
throws InterruptedException {
|
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
|
||||||
BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
|
if (entries.isEmpty()) return;
|
||||||
RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
|
// This method is a little hard to follow. We run through the passed in entries and for each
|
||||||
int done = 0;
|
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
|
||||||
while (entries.size() > 0 && cacheEnabled) {
|
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
|
||||||
// Keep going in case we throw...
|
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
|
||||||
RAMQueueEntry ramEntry = null;
|
// filling ramCache. We do the clean up by again running through the passed in entries
|
||||||
|
// doing extra work when we find a non-null bucketEntries corresponding entry.
|
||||||
|
final int size = entries.size();
|
||||||
|
BucketEntry[] bucketEntries = new BucketEntry[size];
|
||||||
|
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
|
||||||
|
// when we go to add an entry by going around the loop again without upping the index.
|
||||||
|
int index = 0;
|
||||||
|
while (cacheEnabled && index < size) {
|
||||||
|
RAMQueueEntry re = null;
|
||||||
try {
|
try {
|
||||||
ramEntry = entries.remove(entries.size() - 1);
|
re = entries.get(index);
|
||||||
if (ramEntry == null) {
|
if (re == null) {
|
||||||
LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
|
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
|
||||||
|
index++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
|
BucketEntry bucketEntry =
|
||||||
bucketAllocator, deserialiserMap, realCacheSize);
|
re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
|
||||||
ramEntries[done] = ramEntry;
|
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||||
bucketEntries[done++] = bucketEntry;
|
bucketEntries[index] = bucketEntry;
|
||||||
if (ioErrorStartTime > 0) {
|
if (ioErrorStartTime > 0) {
|
||||||
ioErrorStartTime = -1;
|
ioErrorStartTime = -1;
|
||||||
}
|
}
|
||||||
|
index++;
|
||||||
} catch (BucketAllocatorException fle) {
|
} catch (BucketAllocatorException fle) {
|
||||||
LOG.warn("Failed allocating for block "
|
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
|
||||||
+ (ramEntry == null ? "" : ramEntry.getKey()), fle);
|
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
|
||||||
|
bucketEntries[index] = null;
|
||||||
|
index++;
|
||||||
} catch (CacheFullException cfe) {
|
} catch (CacheFullException cfe) {
|
||||||
|
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
|
||||||
if (!freeInProgress) {
|
if (!freeInProgress) {
|
||||||
freeSpace();
|
freeSpace("Full!");
|
||||||
} else {
|
} else {
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
} catch (IOException ioex) {
|
} catch (IOException ioex) {
|
||||||
|
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
|
||||||
LOG.error("Failed writing to bucket cache", ioex);
|
LOG.error("Failed writing to bucket cache", ioex);
|
||||||
checkIOErrorIsTolerated();
|
checkIOErrorIsTolerated();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure that the data pages we have written are on the media before
|
// Make sure data pages are written are on media before we update maps.
|
||||||
// we update the map.
|
|
||||||
try {
|
try {
|
||||||
ioEngine.sync();
|
ioEngine.sync();
|
||||||
} catch (IOException ioex) {
|
} catch (IOException ioex) {
|
||||||
LOG.error("Faild syncing IO engine", ioex);
|
LOG.error("Failed syncing IO engine", ioex);
|
||||||
checkIOErrorIsTolerated();
|
checkIOErrorIsTolerated();
|
||||||
// Since we failed sync, free the blocks in bucket allocator
|
// Since we failed sync, free the blocks in bucket allocator
|
||||||
for (int i = 0; i < done; ++i) {
|
for (int i = 0; i < entries.size(); ++i) {
|
||||||
if (bucketEntries[i] != null) {
|
if (bucketEntries[i] != null) {
|
||||||
bucketAllocator.freeBlock(bucketEntries[i].offset());
|
bucketAllocator.freeBlock(bucketEntries[i].offset());
|
||||||
|
bucketEntries[i] = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < done; ++i) {
|
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
|
||||||
|
// success or error.
|
||||||
|
for (int i = 0; i < size; ++i) {
|
||||||
|
BlockCacheKey key = entries.get(i).getKey();
|
||||||
|
// Only add if non-null entry.
|
||||||
if (bucketEntries[i] != null) {
|
if (bucketEntries[i] != null) {
|
||||||
backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
|
backingMap.put(key, bucketEntries[i]);
|
||||||
}
|
}
|
||||||
RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
|
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||||
|
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
|
||||||
if (ramCacheEntry != null) {
|
if (ramCacheEntry != null) {
|
||||||
heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
|
heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bucketAllocator.getUsedSize() > acceptableSize()) {
|
long used = bucketAllocator.getUsedSize();
|
||||||
freeSpace();
|
if (used > acceptableSize()) {
|
||||||
|
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocks until elements available in <code>q</code> then tries to grab as many as possible
|
||||||
|
* before returning.
|
||||||
|
* @param recepticle Where to stash the elements taken from queue. We clear before we use it
|
||||||
|
* just in case.
|
||||||
|
* @param q The queue to take from.
|
||||||
|
* @return <code>receptical laden with elements taken from the queue or empty if none found.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
|
||||||
|
final List<RAMQueueEntry> receptical)
|
||||||
|
throws InterruptedException {
|
||||||
|
// Clear sets all entries to null and sets size to 0. We retain allocations. Presume it
|
||||||
|
// ok even if list grew to accommodate thousands.
|
||||||
|
receptical.clear();
|
||||||
|
receptical.add(q.take());
|
||||||
|
q.drainTo(receptical);
|
||||||
|
return receptical;
|
||||||
|
}
|
||||||
|
|
||||||
private void persistToFile() throws IOException {
|
private void persistToFile() throws IOException {
|
||||||
assert !cacheEnabled;
|
assert !cacheEnabled;
|
||||||
|
@ -863,11 +922,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
private void checkIOErrorIsTolerated() {
|
private void checkIOErrorIsTolerated() {
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
if (this.ioErrorStartTime > 0) {
|
if (this.ioErrorStartTime > 0) {
|
||||||
if (cacheEnabled
|
if (cacheEnabled && (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
|
||||||
&& (now - ioErrorStartTime) > this.ioErrorsTolerationDuration) {
|
LOG.error("IO errors duration time has exceeded " + ioErrorsTolerationDuration +
|
||||||
LOG.error("IO errors duration time has exceeded "
|
"ms, disabing cache, please check your IOEngine");
|
||||||
+ ioErrorsTolerationDuration
|
|
||||||
+ "ms, disabing cache, please check your IOEngine");
|
|
||||||
disableCache();
|
disableCache();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1128,7 +1185,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
/**
|
/**
|
||||||
* Block Entry stored in the memory with key,data and so on
|
* Block Entry stored in the memory with key,data and so on
|
||||||
*/
|
*/
|
||||||
private static class RAMQueueEntry {
|
@VisibleForTesting
|
||||||
|
static class RAMQueueEntry {
|
||||||
private BlockCacheKey key;
|
private BlockCacheKey key;
|
||||||
private Cacheable data;
|
private Cacheable data;
|
||||||
private long accessTime;
|
private long accessTime;
|
||||||
|
@ -1163,8 +1221,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
// This cacheable thing can't be serialized...
|
// This cacheable thing can't be serialized...
|
||||||
if (len == 0) return null;
|
if (len == 0) return null;
|
||||||
long offset = bucketAllocator.allocateBlock(len);
|
long offset = bucketAllocator.allocateBlock(len);
|
||||||
BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
|
BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
|
||||||
inMemory);
|
|
||||||
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
|
||||||
try {
|
try {
|
||||||
if (data instanceof HFileBlock) {
|
if (data instanceof HFileBlock) {
|
||||||
|
@ -1269,6 +1326,11 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
return (int)(other.getCachedTime() - this.getCachedTime());
|
return (int)(other.getCachedTime() - this.getCachedTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return e.getKey().hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (obj instanceof CachedBlock) {
|
if (obj instanceof CachedBlock) {
|
||||||
|
|
|
@ -0,0 +1,170 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestBucketWriterThread {
|
||||||
|
private BucketCache bc;
|
||||||
|
private BucketCache.WriterThread wt;
|
||||||
|
private BlockingQueue<RAMQueueEntry> q;
|
||||||
|
private Cacheable plainCacheable;
|
||||||
|
private BlockCacheKey plainKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up variables and get BucketCache and WriterThread into state where tests can manually
|
||||||
|
* control the running of WriterThread and BucketCache is empty.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
// Arbitrary capacity.
|
||||||
|
final int capacity = 16;
|
||||||
|
// Run with one writer thread only. Means there will be one writer queue only too. We depend
|
||||||
|
// on this in below.
|
||||||
|
final int writerThreadsCount = 1;
|
||||||
|
this.bc = new BucketCache("heap", capacity, 1, new int [] {1}, writerThreadsCount,
|
||||||
|
capacity, null, 100/*Tolerate ioerrors for 100ms*/);
|
||||||
|
assertEquals(writerThreadsCount, bc.writerThreads.length);
|
||||||
|
assertEquals(writerThreadsCount, bc.writerQueues.size());
|
||||||
|
// Get reference to our single WriterThread instance.
|
||||||
|
this.wt = bc.writerThreads[0];
|
||||||
|
this.q = bc.writerQueues.get(0);
|
||||||
|
// On construction bucketcache WriterThread is blocked on the writer queue so it will not
|
||||||
|
// notice the disabling of the writer until after it has processed an entry. Lets pass one
|
||||||
|
// through after setting disable flag on the writer. We want to disable the WriterThread so
|
||||||
|
// we can run the doDrain manually so we can watch it working and assert it doing right thing.
|
||||||
|
wt.disableWriter();
|
||||||
|
this.plainKey = new BlockCacheKey("f", 0);
|
||||||
|
this.plainCacheable = Mockito.mock(Cacheable.class);
|
||||||
|
bc.cacheBlock(this.plainKey, plainCacheable);
|
||||||
|
while(!bc.ramCache.isEmpty()) Threads.sleep(1);
|
||||||
|
assertTrue(q.isEmpty());
|
||||||
|
// Now writer thread should be disabled.
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (this.bc != null) this.bc.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test non-error case just works.
|
||||||
|
* @throws FileNotFoundException
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=30000)
|
||||||
|
public void testNonErrorCase()
|
||||||
|
throws FileNotFoundException, IOException, InterruptedException {
|
||||||
|
bc.cacheBlock(this.plainKey, this.plainCacheable);
|
||||||
|
doDrainOfOneEntry(this.bc, this.wt, this.q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass through a too big entry and ensure it is cleared from queues and ramCache.
|
||||||
|
* Manually run the WriterThread.
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTooBigEntry() throws InterruptedException {
|
||||||
|
Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
|
||||||
|
Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
|
||||||
|
this.bc.cacheBlock(this.plainKey, tooBigCacheable);
|
||||||
|
doDrainOfOneEntry(this.bc, this.wt, this.q);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then
|
||||||
|
* put it back and process it.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws BucketAllocatorException
|
||||||
|
* @throws CacheFullException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test (timeout=30000)
|
||||||
|
public void testIOE()
|
||||||
|
throws CacheFullException, BucketAllocatorException, IOException, InterruptedException {
|
||||||
|
this.bc.cacheBlock(this.plainKey, plainCacheable);
|
||||||
|
RAMQueueEntry rqe = q.remove();
|
||||||
|
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||||
|
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
|
||||||
|
writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
|
||||||
|
(UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
|
||||||
|
this.q.add(spiedRqe);
|
||||||
|
doDrainOfOneEntry(bc, wt, q);
|
||||||
|
// Cache disabled when ioes w/o ever healing.
|
||||||
|
assertTrue(!bc.isCacheEnabled());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do Cache full exception
|
||||||
|
* @throws IOException
|
||||||
|
* @throws BucketAllocatorException
|
||||||
|
* @throws CacheFullException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test (timeout=30000)
|
||||||
|
public void testCacheFullException()
|
||||||
|
throws CacheFullException, BucketAllocatorException, IOException, InterruptedException {
|
||||||
|
this.bc.cacheBlock(this.plainKey, plainCacheable);
|
||||||
|
RAMQueueEntry rqe = q.remove();
|
||||||
|
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||||
|
final CacheFullException cfe = new CacheFullException(0, 0);
|
||||||
|
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
|
||||||
|
Mockito.doThrow(cfe).
|
||||||
|
doReturn(mockedBucketEntry).
|
||||||
|
when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
|
||||||
|
(UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any());
|
||||||
|
this.q.add(spiedRqe);
|
||||||
|
doDrainOfOneEntry(bc, wt, q);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
|
||||||
|
final BlockingQueue<RAMQueueEntry> q)
|
||||||
|
throws InterruptedException {
|
||||||
|
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<RAMQueueEntry>(1));
|
||||||
|
wt.doDrain(rqes);
|
||||||
|
assertTrue(q.isEmpty());
|
||||||
|
assertTrue(bc.ramCache.isEmpty());
|
||||||
|
assertEquals(0, bc.heapSize());
|
||||||
|
}
|
||||||
|
}
|
|
@ -158,7 +158,7 @@ public class TestIPC {
|
||||||
TestRpcServer(RpcScheduler scheduler) throws IOException {
|
TestRpcServer(RpcScheduler scheduler) throws IOException {
|
||||||
super(null, "testRpcServer",
|
super(null, "testRpcServer",
|
||||||
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
|
Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
|
||||||
new InetSocketAddress("0.0.0.0", 0), CONF, scheduler);
|
new InetSocketAddress("localhost", 0), CONF, scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -53,7 +53,7 @@ import com.google.protobuf.ServiceException;
|
||||||
*/
|
*/
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
public class TestProtoBufRpc {
|
public class TestProtoBufRpc {
|
||||||
public final static String ADDRESS = "0.0.0.0";
|
public final static String ADDRESS = "localhost";
|
||||||
public static int PORT = 0;
|
public static int PORT = 0;
|
||||||
private InetSocketAddress isa;
|
private InetSocketAddress isa;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.procedure;
|
package org.apache.hadoop.hbase.procedure;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
|
@ -26,9 +28,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.SmallTests;
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
Loading…
Reference in New Issue