SOLR-5666: Using the hdfs write cache can result in appearance of corrupted index.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1561751 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-01-27 17:03:09 +00:00
parent 2a36de54dd
commit a193836b86
7 changed files with 134 additions and 97 deletions

View File

@ -229,6 +229,9 @@ Bug Fixes
* SOLR-5663: example-DIH uses non-existing column for mapping (case-sensitive) * SOLR-5663: example-DIH uses non-existing column for mapping (case-sensitive)
(steffkes) (steffkes)
* SOLR-5666: Using the hdfs write cache can result in appearance of corrupted
index. (Mark Miller)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -128,12 +128,10 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory {
new Object[] {slabSize, bankCount, new Object[] {slabSize, bankCount,
((long) bankCount * (long) slabSize)}); ((long) bankCount * (long) slabSize)});
int _1024Size = params.getInt("solr.hdfs.blockcache.bufferstore.1024", int bufferSize = params.getInt("solr.hdfs.blockcache.bufferstore.buffersize", 128);
8192); int bufferCount = params.getInt("solr.hdfs.blockcache.bufferstore.buffercount", 128 * 128);
int _8192Size = params.getInt("solr.hdfs.blockcache.bufferstore.8192",
8192);
BufferStore.init(_1024Size, _8192Size, metrics); BufferStore.initNewBuffer(bufferSize, bufferCount);
long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank long totalMemory = (long) bankCount * (long) numberOfBlocksPerBank
* (long) blockSize; * (long) blockSize;
try { try {

View File

@ -138,33 +138,34 @@ public class BlockDirectory extends Directory {
} }
static class CachedIndexInput extends CustomBufferedIndexInput { static class CachedIndexInput extends CustomBufferedIndexInput {
private final Store store;
private IndexInput _source; private IndexInput source;
private int _blockSize; private final int blockSize;
private long _fileLength; private final long fileLength;
private String _cacheName; private final String cacheName;
private Cache _cache; private final Cache cache;
public CachedIndexInput(IndexInput source, int blockSize, String name, public CachedIndexInput(IndexInput source, int blockSize, String name,
String cacheName, Cache cache, int bufferSize) { String cacheName, Cache cache, int bufferSize) {
super(name, bufferSize); super(name, bufferSize);
_source = source; this.source = source;
_blockSize = blockSize; this.blockSize = blockSize;
_fileLength = source.length(); fileLength = source.length();
_cacheName = cacheName; this.cacheName = cacheName;
_cache = cache; this.cache = cache;
store = BufferStore.instance(blockSize);
} }
@Override @Override
public IndexInput clone() { public IndexInput clone() {
CachedIndexInput clone = (CachedIndexInput) super.clone(); CachedIndexInput clone = (CachedIndexInput) super.clone();
clone._source = (IndexInput) _source.clone(); clone.source = (IndexInput) source.clone();
return clone; return clone;
} }
@Override @Override
public long length() { public long length() {
return _source.length(); return source.length();
} }
@Override @Override
@ -186,7 +187,7 @@ public class BlockDirectory extends Directory {
// read whole block into cache and then provide needed data // read whole block into cache and then provide needed data
long blockId = getBlock(position); long blockId = getBlock(position);
int blockOffset = (int) getPosition(position); int blockOffset = (int) getPosition(position);
int lengthToReadInBlock = Math.min(len, _blockSize - blockOffset); int lengthToReadInBlock = Math.min(len, blockSize - blockOffset);
if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) { if (checkCache(blockId, blockOffset, b, off, lengthToReadInBlock)) {
return lengthToReadInBlock; return lengthToReadInBlock;
} else { } else {
@ -199,25 +200,25 @@ public class BlockDirectory extends Directory {
private void readIntoCacheAndResult(long blockId, int blockOffset, private void readIntoCacheAndResult(long blockId, int blockOffset,
byte[] b, int off, int lengthToReadInBlock) throws IOException { byte[] b, int off, int lengthToReadInBlock) throws IOException {
long position = getRealPosition(blockId, 0); long position = getRealPosition(blockId, 0);
int length = (int) Math.min(_blockSize, _fileLength - position); int length = (int) Math.min(blockSize, fileLength - position);
_source.seek(position); source.seek(position);
byte[] buf = BufferStore.takeBuffer(_blockSize); byte[] buf = store.takeBuffer(blockSize);
_source.readBytes(buf, 0, length); source.readBytes(buf, 0, length);
System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock); System.arraycopy(buf, blockOffset, b, off, lengthToReadInBlock);
_cache.update(_cacheName, blockId, 0, buf, 0, _blockSize); cache.update(cacheName, blockId, 0, buf, 0, blockSize);
BufferStore.putBuffer(buf); store.putBuffer(buf);
} }
private boolean checkCache(long blockId, int blockOffset, byte[] b, private boolean checkCache(long blockId, int blockOffset, byte[] b,
int off, int lengthToReadInBlock) { int off, int lengthToReadInBlock) {
return _cache.fetch(_cacheName, blockId, blockOffset, b, off, return cache.fetch(cacheName, blockId, blockOffset, b, off,
lengthToReadInBlock); lengthToReadInBlock);
} }
@Override @Override
protected void closeInternal() throws IOException { protected void closeInternal() throws IOException {
_source.close(); source.close();
} }
} }

View File

@ -19,34 +19,50 @@ package org.apache.solr.store.blockcache;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BufferStore { public class BufferStore implements Store {
public static Logger LOG = LoggerFactory.getLogger(BufferStore.class);
private static BlockingQueue<byte[]> _1024 = setupBuffers(1024, 1);
private static BlockingQueue<byte[]> _8192 = setupBuffers(8192, 1);
public static AtomicLong shardBuffercacheLost = new AtomicLong();
public static AtomicLong shardBuffercacheAllocate1024 = new AtomicLong();
public static AtomicLong shardBuffercacheAllocate8192 = new AtomicLong();
public static AtomicLong shardBuffercacheAllocateOther = new AtomicLong();
public static void init(int _1024Size, int _8192Size, Metrics metrics) {
LOG.info("Initializing the 1024 buffers with [{}] buffers.", _1024Size); private static final Store EMPTY = new Store() {
_1024 = setupBuffers(1024, _1024Size);
LOG.info("Initializing the 8192 buffers with [{}] buffers.", _8192Size); @Override
_8192 = setupBuffers(8192, _8192Size); public byte[] takeBuffer(int bufferSize) {
shardBuffercacheLost = metrics.shardBuffercacheLost; return new byte[bufferSize];
shardBuffercacheAllocate1024 = metrics.shardBuffercacheAllocate1024; }
shardBuffercacheAllocate8192 = metrics.shardBuffercacheAllocate8192;
shardBuffercacheAllocateOther = metrics.shardBuffercacheAllocateOther; @Override
public void putBuffer(byte[] buffer) {
}
};
private final static ConcurrentMap<Integer, BufferStore> bufferStores = new ConcurrentHashMap<Integer, BufferStore>();
private final BlockingQueue<byte[]> buffers;
private final int bufferSize;
public synchronized static void initNewBuffer(int bufferSize, long totalAmount) {
if (totalAmount == 0) {
return;
}
BufferStore bufferStore = bufferStores.get(bufferSize);
if (bufferStore == null) {
long count = totalAmount / bufferSize;
if (count > Integer.MAX_VALUE) {
count = Integer.MAX_VALUE;
}
BufferStore store = new BufferStore(bufferSize, (int) count);
bufferStores.put(bufferSize, store);
}
} }
private BufferStore(int bufferSize, int count) {
this.bufferSize = bufferSize;
buffers = setupBuffers(bufferSize, count);
}
private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) { private static BlockingQueue<byte[]> setupBuffers(int bufferSize, int count) {
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count); BlockingQueue<byte[]> queue = new ArrayBlockingQueue<byte[]>(count);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -54,57 +70,44 @@ public class BufferStore {
} }
return queue; return queue;
} }
public static byte[] takeBuffer(int bufferSize) { public static Store instance(int bufferSize) {
switch (bufferSize) { BufferStore bufferStore = bufferStores.get(bufferSize);
case 1024: if (bufferStore == null) {
return newBuffer1024(_1024.poll()); return EMPTY;
case 8192:
return newBuffer8192(_8192.poll());
default:
return newBuffer(bufferSize);
} }
return bufferStore;
} }
public static void putBuffer(byte[] buffer) { @Override
public byte[] takeBuffer(int bufferSize) {
if (this.bufferSize != bufferSize) {
throw new RuntimeException("Buffer with length [" + bufferSize + "] does not match buffer size of ["
+ bufferSize + "]");
}
return newBuffer(buffers.poll());
}
@Override
public void putBuffer(byte[] buffer) {
if (buffer == null) { if (buffer == null) {
return; return;
} }
int bufferSize = buffer.length; if (buffer.length != bufferSize) {
switch (bufferSize) { throw new RuntimeException("Buffer with length [" + buffer.length + "] does not match buffer size of ["
case 1024: + bufferSize + "]");
checkReturn(_1024.offer(buffer));
return;
case 8192:
checkReturn(_8192.offer(buffer));
return;
} }
checkReturn(buffers.offer(buffer));
} }
private static void checkReturn(boolean offer) { private void checkReturn(boolean offer) {
if (!offer) {
shardBuffercacheLost.incrementAndGet();
}
} }
private static byte[] newBuffer1024(byte[] buf) { private byte[] newBuffer(byte[] buf) {
if (buf != null) { if (buf != null) {
return buf; return buf;
} }
shardBuffercacheAllocate1024.incrementAndGet(); return new byte[bufferSize];
return new byte[1024];
}
private static byte[] newBuffer8192(byte[] buf) {
if (buf != null) {
return buf;
}
shardBuffercacheAllocate8192.incrementAndGet();
return new byte[8192];
}
private static byte[] newBuffer(int size) {
shardBuffercacheAllocateOther.incrementAndGet();
return new byte[size];
} }
} }

View File

@ -35,6 +35,8 @@ public abstract class CustomBufferedIndexInput extends IndexInput {
private int bufferLength = 0; // end of valid bytes private int bufferLength = 0; // end of valid bytes
private int bufferPosition = 0; // next byte to read private int bufferPosition = 0; // next byte to read
private Store store;
@Override @Override
public byte readByte() throws IOException { public byte readByte() throws IOException {
if (bufferPosition >= bufferLength) refill(); if (bufferPosition >= bufferLength) refill();
@ -49,6 +51,7 @@ public abstract class CustomBufferedIndexInput extends IndexInput {
super(resourceDesc); super(resourceDesc);
checkBufferSize(bufferSize); checkBufferSize(bufferSize);
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.store = BufferStore.instance(bufferSize);
} }
private void checkBufferSize(int bufferSize) { private void checkBufferSize(int bufferSize) {
@ -179,7 +182,7 @@ public abstract class CustomBufferedIndexInput extends IndexInput {
if (newLength <= 0) throw new EOFException("read past EOF"); if (newLength <= 0) throw new EOFException("read past EOF");
if (buffer == null) { if (buffer == null) {
buffer = BufferStore.takeBuffer(bufferSize); buffer = store.takeBuffer(bufferSize);
seekInternal(bufferStart); seekInternal(bufferStart);
} }
readInternal(buffer, 0, newLength); readInternal(buffer, 0, newLength);
@ -191,7 +194,7 @@ public abstract class CustomBufferedIndexInput extends IndexInput {
@Override @Override
public final void close() throws IOException { public final void close() throws IOException {
closeInternal(); closeInternal();
BufferStore.putBuffer(buffer); store.putBuffer(buffer);
buffer = null; buffer = null;
} }

View File

@ -1,6 +1,6 @@
package org.apache.solr.store.blockcache; package org.apache.solr.store.blockcache;
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
@ -38,6 +38,8 @@ public abstract class ReusedBufferedIndexOutput extends IndexOutput {
/** total length of the file */ /** total length of the file */
private long fileLength = 0; private long fileLength = 0;
private final Store store;
public ReusedBufferedIndexOutput() { public ReusedBufferedIndexOutput() {
this(BUFFER_SIZE); this(BUFFER_SIZE);
} }
@ -45,7 +47,8 @@ public abstract class ReusedBufferedIndexOutput extends IndexOutput {
public ReusedBufferedIndexOutput(int bufferSize) { public ReusedBufferedIndexOutput(int bufferSize) {
checkBufferSize(bufferSize); checkBufferSize(bufferSize);
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
buffer = BufferStore.takeBuffer(this.bufferSize); store = BufferStore.instance(bufferSize);
buffer = store.takeBuffer(this.bufferSize);
} }
protected long getBufferStart() { protected long getBufferStart() {
@ -80,7 +83,7 @@ public abstract class ReusedBufferedIndexOutput extends IndexOutput {
public void close() throws IOException { public void close() throws IOException {
flushBufferToCache(); flushBufferToCache();
closeInternal(); closeInternal();
BufferStore.putBuffer(buffer); store.putBuffer(buffer);
buffer = null; buffer = null;
} }

View File

@ -0,0 +1,26 @@
package org.apache.solr.store.blockcache;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public interface Store {
byte[] takeBuffer(int bufferSize);
void putBuffer(byte[] buffer);
}