diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 2ea65fd8d2c..518c52c2284 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -159,6 +159,10 @@ Bug Fixes * SOLR-10104: BlockDirectoryCache release hooks do not work with multiple directories. (Mike Drob, Mark Miller) +* SOLR-10121: Fix race conditions in HDFS BlockCache that can contribute to corruption in high + concurrency situations. (yonik) + + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java index f00ca1ddaf4..20d2721bdcf 100644 --- a/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java +++ b/solr/core/src/java/org/apache/solr/store/blockcache/BlockCache.java @@ -94,7 +94,10 @@ public class BlockCache { } int bankId = location.getBankId(); int block = location.getBlock(); + + // mark the block removed before we release the lock to allow it to be reused location.setRemoved(true); + locks[bankId].clear(block); lockCounters[bankId].decrementAndGet(); for (OnRelease onRelease : onReleases) { @@ -105,7 +108,8 @@ public class BlockCache { } /** - * This is only best-effort... it's possible for false to be returned. + * This is only best-effort... it's possible for false to be returned, meaning the block was not able to be cached. + * NOTE: blocks may not currently be updated (false will be returned if the block is already cached) * The blockCacheKey is cloned before it is inserted into the map, so it may be reused by clients if desired. * * @param blockCacheKey the key for the block @@ -123,9 +127,7 @@ public class BlockCache { + blockOffset + "]"); } BlockCacheLocation location = cache.getIfPresent(blockCacheKey); - boolean newLocation = false; if (location == null) { - newLocation = true; location = new BlockCacheLocation(); if (!findEmptyLocation(location)) { // YCS: it looks like when the cache is full (a normal scenario), then two concurrent writes will result in one of them failing @@ -133,11 +135,12 @@ public class BlockCache { // TODO: simplest fix would be to leave more than one block empty return false; } - } - - // YCS: I think this means that the block existed, but it is in the process of being - // concurrently removed. This flag is set in the releaseLocation eviction listener. - if (location.isRemoved()) { + } else { + // If we allocated a new block, then it has never been published and is thus never in danger of being concurrently removed. + // On the other hand, if this is an existing block we are updating, it may concurrently be removed and reused for another + // purpose (and then our write may overwrite that). This can happen even if clients never try to update existing blocks, + // since two clients can try to cache the same block concurrently. Because of this, the ability to update an existing + // block has been removed for the time being (see SOLR-10121). return false; } @@ -146,10 +149,10 @@ public class BlockCache { ByteBuffer bank = getBank(bankId); bank.position(bankOffset + blockOffset); bank.put(data, offset, length); - if (newLocation) { - cache.put(blockCacheKey.clone(), location); - metrics.blockCacheSize.incrementAndGet(); - } + + // make sure all modifications to the block have been completed before we publish it. + cache.put(blockCacheKey.clone(), location); + metrics.blockCacheSize.incrementAndGet(); return true; } @@ -167,16 +170,20 @@ public class BlockCache { if (location == null) { return false; } - if (location.isRemoved()) { - // location is in the process of being removed and the block may have already been reused by this point. - return false; - } + int bankId = location.getBankId(); int bankOffset = location.getBlock() * blockSize; location.touch(); ByteBuffer bank = getBank(bankId); bank.position(bankOffset + blockOffset); bank.get(buffer, off, length); + + if (location.isRemoved()) { + // must check *after* the read is done since the bank may have been reused for another block + // before or during the read. + return false; + } + return true; } diff --git a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java index 795518dbe50..0389f43577d 100644 --- a/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java +++ b/solr/core/src/test/org/apache/solr/store/blockcache/BlockCacheTest.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.LuceneTestCase; import org.junit.Test; public class BlockCacheTest extends LuceneTestCase { + @Test public void testBlockCache() { int blocksInTest = 2000000; @@ -60,8 +61,9 @@ public class BlockCacheTest extends LuceneTestCase { byte[] testData = testData(random, blockSize, newData); long t1 = System.nanoTime(); - blockCache.store(blockCacheKey, 0, testData, 0, blockSize); + boolean success = blockCache.store(blockCacheKey, 0, testData, 0, blockSize); storeTime += (System.nanoTime() - t1); + if (!success) continue; // for now, updating existing blocks is not supported... see SOLR-10121 long t3 = System.nanoTime(); if (blockCache.fetch(blockCacheKey, buffer)) { @@ -76,33 +78,6 @@ public class BlockCacheTest extends LuceneTestCase { System.out.println("# of Elements = " + blockCache.getSize()); } - /** - * Verify checking of buffer size limits against the cached block size. - */ - @Test - public void testLongBuffer() { - Random random = random(); - int blockSize = BlockCache._32K; - int slabSize = blockSize * 1024; - long totalMemory = 2 * slabSize; - - BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize); - BlockCacheKey blockCacheKey = new BlockCacheKey(); - blockCacheKey.setBlock(0); - blockCacheKey.setFile(0); - blockCacheKey.setPath("/"); - byte[] newData = new byte[blockSize*3]; - byte[] testData = testData(random, blockSize, newData); - - assertTrue(blockCache.store(blockCacheKey, 0, testData, 0, blockSize)); - assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize, blockSize)); - assertTrue(blockCache.store(blockCacheKey, 0, testData, blockSize*2, blockSize)); - - assertTrue(blockCache.store(blockCacheKey, 1, testData, 0, blockSize - 1)); - assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize, blockSize - 1)); - assertTrue(blockCache.store(blockCacheKey, 1, testData, blockSize*2, blockSize - 1)); - } - private static byte[] testData(Random random, int size, byte[] buf) { random.nextBytes(buf); return buf; @@ -123,22 +98,23 @@ public class BlockCacheTest extends LuceneTestCase { public void testBlockCacheConcurrent() throws Exception { Random rnd = random(); + final int blocksInTest = 400; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word. + final int blockSize = 64; + final int slabSize = blocksInTest * blockSize / 4; + final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks + /*** - final int blocksInTest = 256; - final int blockSize = 1024; - final int slabSize = blockSize * 128; - final long totalMemory = 2 * slabSize; - ***/ + final int blocksInTest = 16384; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word. + final int blockSize = 1024; + final int slabSize = blocksInTest * blockSize / 4; + final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks + ***/ - final int blocksInTest = 16384; // pick something that won't fit in memory, but is small enough to cause a medium hit rate. 16MB of blocks is double the total memory size of the cache. - final int blockSize = 1024; - final int slabSize = blockSize * 4096; - final long totalMemory = 2 * slabSize; // should give us 2 slabs (8MB) - - final int nThreads=2; + final int nThreads=64; final int nReads=1000000; final int readsPerThread=nReads/nThreads; final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues + final int showErrors=50; // show first 50 validation failures final BlockCache blockCache = new BlockCache(new Metrics(), true, totalMemory, slabSize, blockSize); @@ -147,6 +123,7 @@ public class BlockCacheTest extends LuceneTestCase { final AtomicLong missesInCache = new AtomicLong(); final AtomicLong storeFails = new AtomicLong(); final AtomicLong lastBlock = new AtomicLong(); + final AtomicLong validateFails = new AtomicLong(0); final int file = 0; @@ -158,7 +135,7 @@ public class BlockCacheTest extends LuceneTestCase { threads[i] = new Thread() { Random r; - BlockCacheKey blockCacheKey = new BlockCacheKey(); + BlockCacheKey blockCacheKey; byte[] buffer = new byte[blockSize]; @Override @@ -201,8 +178,9 @@ public class BlockCacheTest extends LuceneTestCase { for (int i = 0; i < len; i++) { long globalPos = globalOffset + i; if (buffer[i] != getByte(globalPos)) { - System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos)); failed.set(true); + if (validateFails.incrementAndGet() <= showErrors) System.out.println("ERROR: read was " + "block=" + block + " blockOffset=" + blockOffset + " len=" + len + " globalPos=" + globalPos + " localReadOffset=" + i + " got=" + buffer[i] + " expected=" + getByte(globalPos)); + break; } } } else { @@ -236,6 +214,7 @@ public class BlockCacheTest extends LuceneTestCase { System.out.println("Cache Hits = " + hitsInCache.get()); System.out.println("Cache Misses = " + missesInCache.get()); System.out.println("Cache Store Fails = " + storeFails.get()); + System.out.println("Blocks with Errors = " + validateFails.get()); assertFalse( failed.get() ); }