better writing of local md5 caches

This commit is contained in:
kimchy 2010-08-21 02:16:36 +03:00
parent d97695ad10
commit 0c72bb2125
7 changed files with 57 additions and 55 deletions

View File

@ -531,4 +531,26 @@ public class Digest {
public static String shaHex(String data) {
return Hex.encodeHexString(sha(data));
}
public static final NullDigest NULL_DIGEST = new NullDigest("null");
private static final class NullDigest extends MessageDigest {
private NullDigest(String algorithm) {
super(algorithm);
}
@Override protected void engineUpdate(byte input) {
}
@Override protected void engineUpdate(byte[] input, int offset, int len) {
}
@Override protected byte[] engineDigest() {
return null;
}
@Override protected void engineReset() {
}
}
}

View File

@ -197,6 +197,11 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
listener.onRecoveryDone();
scheduleSnapshotIfNeeded();
} catch (IndexShardGatewayRecoveryException e) {
if (indexShard.state() == IndexShardState.CLOSED) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");
return;
}
if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) {
// got closed on us, just ignore this recovery
listener.onIgnoreRecovery("shard closed");

View File

@ -597,7 +597,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
if (!failures.isEmpty()) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery index", failures.get(0));
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recover index", failures.get(0));
}
// read the gateway data persisted

View File

@ -58,11 +58,6 @@ public interface IndexStore extends IndexComponent {
*/
ByteSizeValue backingStoreFreeSpace();
/**
* Lists all unallocated stores.
*/
StoreFilesMetaData[] listUnallocatedStores() throws IOException;
void deleteUnallocated(ShardId shardId) throws IOException;
/**

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.store.fs;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.Digest;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
@ -40,7 +39,6 @@ import org.elasticsearch.index.store.support.AbstractIndexStore;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -95,27 +93,10 @@ public abstract class FsIndexStore extends AbstractIndexStore {
if (indexService.hasShard(shardId.id())) {
throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted");
}
cachedUnallocatedMd5s.remove(shardId);
FileSystemUtils.deleteRecursively(shardLocation(shardId));
}
@Override public StoreFilesMetaData[] listUnallocatedStores() throws IOException {
if (location == null) {
return new StoreFilesMetaData[0];
}
File[] shardLocations = location.listFiles();
if (shardLocations == null || shardLocations.length == 0) {
return new StoreFilesMetaData[0];
}
List<StoreFilesMetaData> shards = Lists.newArrayList();
for (File shardLocation : shardLocations) {
int shardId = Integer.parseInt(shardLocation.getName());
if (!indexService.hasShard(shardId)) {
shards.add(listUnallocatedStoreMetaData(new ShardId(index, shardId)));
}
}
return shards.toArray(new StoreFilesMetaData[shards.size()]);
}
@Override protected StoreFilesMetaData listUnallocatedStoreMetaData(ShardId shardId) throws IOException {
if (location == null) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());

View File

@ -48,10 +48,6 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
// do nothing here...
}
@Override public StoreFilesMetaData[] listUnallocatedStores() throws IOException {
return new StoreFilesMetaData[0];
}
@Override public StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId.id());
if (indexShard == null) {

View File

@ -98,12 +98,8 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
return null;
}
if (md.md5() == null) {
byte[] md5Bytes = Digest.md5HexToByteArray(md5);
if (shouldWriteMd5(name)) {
IndexOutput output = directory().createOutput(name + ".md5");
output.writeBytes(md5Bytes, md5Bytes.length);
output.close();
writeMd5File(directory(), name, md5);
}
md = new StoreFileMetaData(md.name(), md.sizeInBytes(), md.sizeInBytes(), md5);
@ -164,7 +160,14 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
}
private boolean shouldWriteMd5(String name) {
return !name.startsWith("segments");
return !name.startsWith("segments") && !name.endsWith(".md5");
}
private void writeMd5File(Directory directory, String file, String md5) throws IOException {
byte[] md5Bytes = Digest.md5HexToByteArray(md5);
IndexOutput output = directory.createOutput(file + ".md5");
output.writeBytes(md5Bytes, md5Bytes.length);
output.close();
}
/**
@ -188,10 +191,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
if (md5 != null) {
if (shouldWriteMd5(file)) {
byte[] md5Bytes = Digest.md5HexToByteArray(md5);
IndexOutput output = delegate.createOutput(file + ".md5");
output.writeBytes(md5Bytes, md5Bytes.length);
output.close();
writeMd5File(delegate, file, md5);
}
}
@ -328,14 +328,16 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
private final String name;
private final MessageDigest digest;
private boolean ignoreDigest = false;
private MessageDigest digest;
private StoreIndexOutput(IndexOutput delegate, String name) {
this.delegate = delegate;
this.name = name;
if (shouldWriteMd5(name)) {
this.digest = Digest.getMd5Digest();
} else {
this.digest = Digest.NULL_DIGEST;
}
}
@Override public void close() throws IOException {
@ -343,14 +345,14 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
synchronized (mutex) {
StoreFileMetaData md = filesMetadata.get(name);
String md5 = md == null ? null : md.md5();
if (!ignoreDigest) {
md5 = Hex.encodeHexString(digest.digest());
byte[] digestBytes = digest.digest();
if (digestBytes != null) {
md5 = Hex.encodeHexString(digestBytes);
if (shouldWriteMd5(name)) {
writeMd5File(directory(), name, md5);
}
if (md == null) {
md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), md5);
} else {
md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), md5);
}
md = new StoreFileMetaData(name, directory().fileLength(name), directory().fileModified(name), md5);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, md).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
}
@ -366,9 +368,10 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
digest.update(b, offset, length);
}
@Override public void copyBytes(IndexInput input, long numBytes) throws IOException {
delegate.copyBytes(input, numBytes);
}
// don't override it, base class method simple reads from input and writes to this output
// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException {
// delegate.copyBytes(input, numBytes);
// }
@Override public void flush() throws IOException {
delegate.flush();
@ -381,7 +384,7 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
@Override public void seek(long pos) throws IOException {
delegate.seek(pos);
// once we seek, digest is not applicable
ignoreDigest = true;
digest = Digest.NULL_DIGEST;
}
@Override public long length() throws IOException {