SOLR-10121: fix race conditions in BlockCache.fetch and BlockCache.store

This commit is contained in:
yonik 2017-02-15 09:56:50 -05:00
parent b6f49dc1fb
commit b71a667d74
3 changed files with 47 additions and 57 deletions

View File

@ -159,6 +159,10 @@ Bug Fixes
* SOLR-10104: BlockDirectoryCache release hooks do not work with multiple directories. (Mike Drob, Mark Miller) * SOLR-10104: BlockDirectoryCache release hooks do not work with multiple directories. (Mike Drob, Mark Miller)
* SOLR-10121: Fix race conditions in HDFS BlockCache that can contribute to corruption in high
concurrency situations. (yonik)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -94,7 +94,10 @@ public class BlockCache {
} }
int bankId = location.getBankId(); int bankId = location.getBankId();
int block = location.getBlock(); int block = location.getBlock();
// mark the block removed before we release the lock to allow it to be reused
location.setRemoved(true); location.setRemoved(true);
locks[bankId].clear(block); locks[bankId].clear(block);
lockCounters[bankId].decrementAndGet(); lockCounters[bankId].decrementAndGet();
for (OnRelease onRelease : onReleases) { for (OnRelease onRelease : onReleases) {
@ -105,7 +108,8 @@ public class BlockCache {
} }
/** /**
* This is only best-effort... it's possible for false to be returned. * This is only best-effort... it's possible for false to be returned, meaning the block was not able to be cached.
* NOTE: blocks may not currently be updated (false will be returned if the block is already cached)
* The blockCacheKey is cloned before it is inserted into the map, so it may be reused by clients if desired. * The blockCacheKey is cloned before it is inserted into the map, so it may be reused by clients if desired.
* *
* @param blockCacheKey the key for the block * @param blockCacheKey the key for the block
@ -123,9 +127,7 @@ public class BlockCache {
+ blockOffset + "]"); + blockOffset + "]");
} }
BlockCacheLocation location = cache.getIfPresent(blockCacheKey); BlockCacheLocation location = cache.getIfPresent(blockCacheKey);
boolean newLocation = false;
if (location == null) { if (location == null) {
newLocation = true;
location = new BlockCacheLocation(); location = new BlockCacheLocation();
if (!findEmptyLocation(location)) { if (!findEmptyLocation(location)) {
// YCS: it looks like when the cache is full (a normal scenario), then two concurrent writes will result in one of them failing // YCS: it looks like when the cache is full (a normal scenario), then two concurrent writes will result in one of them failing
@ -133,11 +135,12 @@ public class BlockCache {
// TODO: simplest fix would be to leave more than one block empty // TODO: simplest fix would be to leave more than one block empty
return false; return false;
} }
} } else {
// If we allocated a new block, then it has never been published and is thus never in danger of being concurrently removed.
// YCS: I think this means that the block existed, but it is in the process of being // On the other hand, if this is an existing block we are updating, it may concurrently be removed and reused for another
// concurrently removed. This flag is set in the releaseLocation eviction listener. // purpose (and then our write may overwrite that). This can happen even if clients never try to update existing blocks,
if (location.isRemoved()) { // since two clients can try to cache the same block concurrently. Because of this, the ability to update an existing
// block has been removed for the time being (see SOLR-10121).
return false; return false;
} }
@ -146,10 +149,10 @@ public class BlockCache {
ByteBuffer bank = getBank(bankId); ByteBuffer bank = getBank(bankId);
bank.position(bankOffset + blockOffset); bank.position(bankOffset + blockOffset);
bank.put(data, offset, length); bank.put(data, offset, length);
if (newLocation) {
cache.put(blockCacheKey.clone(), location); // make sure all modifications to the block have been completed before we publish it.
metrics.blockCacheSize.incrementAndGet(); cache.put(blockCacheKey.clone(), location);
} metrics.blockCacheSize.incrementAndGet();
return true; return true;
} }
@ -167,16 +170,20 @@ public class BlockCache {
if (location == null) { if (location == null) {
return false; return false;
} }
if (location.isRemoved()) {
// location is in the process of being removed and the block may have already been reused by this point.
return false;
}
int bankId = location.getBankId(); int bankId = location.getBankId();
int bankOffset = location.getBlock() * blockSize; int bankOffset = location.getBlock() * blockSize;
location.touch(); location.touch();
ByteBuffer bank = getBank(bankId); ByteBuffer bank = getBank(bankId);
bank.position(bankOffset + blockOffset); bank.position(bankOffset + blockOffset);
bank.get(buffer, off, length); bank.get(buffer, off, length);
if (location.isRemoved()) {
// must check *after* the read is done since the bank may have been reused for another block
// before or during the read.
return false;
}
return true; return true;
} }

View File

@ -25,6 +25,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.junit.Test; import org.junit.Test;
public class BlockCacheTest extends LuceneTestCase { public class BlockCacheTest extends LuceneTestCase {
@Test @Test
public void testBlockCache() { public void testBlockCache() {
int blocksInTest = 2000000; int blocksInTest = 2000000;
@ -60,8 +61,9 @@ public class BlockCacheTest extends LuceneTestCase {
byte[] testData = testData(random, blockSize, newData); byte[] testData = testData(random, blockSize, newData);
long t1 = System.nanoTime(); long t1 = System.nanoTime();
blockCache.store(blockCacheKey, 0, testData, 0, blockSize); boolean success = blockCache.store(blockCacheKey, 0, testData, 0, blockSize);
storeTime += (System.nanoTime() - t1); storeTime += (System.nanoTime() - t1);
if (!success) continue; // for now, updating existing blocks is not supported... see SOLR-10121
long t3 = System.nanoTime(); long t3 = System.nanoTime();
if (blockCache.fetch(blockCacheKey, buffer)) { if (blockCache.fetch(blockCacheKey, buffer)) {
@ -76,33 +78,6 @@ public class BlockCacheTest extends LuceneTestCase {
System.out.println("# of Elements = " + blockCache.getSize()); System.out.println("# of Elements = " + blockCache.getSize());
} }
/**
* Verify checking of buffer size limits against the cached block size.
*/
@Test
public void testLongBuffer() {
Random random = random();
int blockSize = BlockCache._32K;
int slabSize = blockSize * 1024;
long totalMemory = 2 * slabSize;
BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize);
BlockCacheKey blockCacheKey = new BlockCacheKey();
blockCacheKey.setBlock(0);
blockCacheKey.setFile(0);
blockCacheKey.setPath("/");
byte[] newData = new byte[blockSize*3];
byte[] testData = testData(random, blockSize, newData);
assertTrue(blockCache.store(blockCacheKey, 0, testData, 0, blockSize));
assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize, blockSize));
assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize*2, blockSize));
assertTrue(blockCache.store(blockCacheKey, 1, testData, 0, blockSize - 1));
assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize, blockSize - 1));
assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize*2, blockSize - 1));
}
private static byte[] testData(Random random, int size, byte[] buf) { private static byte[] testData(Random random, int size, byte[] buf) {
random.nextBytes(buf); random.nextBytes(buf);
return buf; return buf;
@ -123,22 +98,23 @@ public class BlockCacheTest extends LuceneTestCase {
public void testBlockCacheConcurrent() throws Exception { public void testBlockCacheConcurrent() throws Exception {
Random rnd = random(); Random rnd = random();
final int blocksInTest = 400; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
final int blockSize = 64;
final int slabSize = blocksInTest * blockSize / 4;
final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks
/*** /***
final int blocksInTest = 256; final int blocksInTest = 16384; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
final int blockSize = 1024; final int blockSize = 1024;
final int slabSize = blockSize * 128; final int slabSize = blocksInTest * blockSize / 4;
final long totalMemory = 2 * slabSize; final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks
***/ ***/
final int blocksInTest = 16384; // pick something that won't fit in memory, but is small enough to cause a medium hit rate. 16MB of blocks is double the total memory size of the cache. final int nThreads=64;
final int blockSize = 1024;
final int slabSize = blockSize * 4096;
final long totalMemory = 2 * slabSize; // should give us 2 slabs (8MB)
final int nThreads=2;
final int nReads=1000000; final int nReads=1000000;
final int readsPerThread=nReads/nThreads; final int readsPerThread=nReads/nThreads;
final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues
final int showErrors=50; // show first 50 validation failures
final BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize, blockSize); final BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize, blockSize);
@ -147,6 +123,7 @@ public class BlockCacheTest extends LuceneTestCase {
final AtomicLong missesInCache = new AtomicLong(); final AtomicLong missesInCache = new AtomicLong();
final AtomicLong storeFails = new AtomicLong(); final AtomicLong storeFails = new AtomicLong();
final AtomicLong lastBlock = new AtomicLong(); final AtomicLong lastBlock = new AtomicLong();
final AtomicLong validateFails = new AtomicLong(0);
final int file = 0; final int file = 0;
@ -158,7 +135,7 @@ public class BlockCacheTest extends LuceneTestCase {
threads[i] = new Thread() { threads[i] = new Thread() {
Random r; Random r;
BlockCacheKey blockCacheKey = new BlockCacheKey(); BlockCacheKey blockCacheKey;
byte[] buffer = new byte[blockSize]; byte[] buffer = new byte[blockSize];
@Override @Override
@ -201,8 +178,9 @@ public class BlockCacheTest extends LuceneTestCase {
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
long globalPos = globalOffset + i; long globalPos = globalOffset + i;
if (buffer[i] != getByte(globalPos)) { if (buffer[i] != getByte(globalPos)) {
System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
failed.set(true); failed.set(true);
if (validateFails.incrementAndGet() <= showErrors) System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos));
break;
} }
} }
} else { } else {
@ -236,6 +214,7 @@ public class BlockCacheTest extends LuceneTestCase {
System.out.println("Cache Hits = " + hitsInCache.get()); System.out.println("Cache Hits = " + hitsInCache.get());
System.out.println("Cache Misses = " + missesInCache.get()); System.out.println("Cache Misses = " + missesInCache.get());
System.out.println("Cache Store Fails = " + storeFails.get()); System.out.println("Cache Store Fails = " + storeFails.get());
System.out.println("Blocks with Errors = " + validateFails.get());
assertFalse( failed.get() ); assertFalse( failed.get() );
} }