verify operations on store dir are executed when its open

call ensureOpen and properly set the open flag
also, better handling of failures and error message during listAll in local recovery
This commit is contained in:
Shay Banon 2013-11-03 14:23:58 +01:00
parent 8f88d0aa4a
commit 671d2dd650
2 changed files with 20 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
@ -103,12 +104,12 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
SegmentInfos si = null; SegmentInfos si = null;
try { try {
si = Lucene.readSegmentInfos(indexShard.store().directory()); si = Lucene.readSegmentInfos(indexShard.store().directory());
} catch (Exception e) { } catch (Throwable e) {
String files = "_unknown_"; String files = "_unknown_";
try { try {
files = Arrays.toString(indexShard.store().directory().listAll()); files = Arrays.toString(indexShard.store().directory().listAll());
} catch (Exception e1) { } catch (Throwable e1) {
// ignore files += " (failure=" + ExceptionsHelper.detailedMessage(e1) + ")";
} }
if (indexShouldExists && indexShard.store().indexStore().persistent()) { if (indexShouldExists && indexShard.store().indexStore().persistent()) {
throw new IndexShardGatewayRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e); throw new IndexShardGatewayRecoveryException(shardId(), "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, e);
@ -131,8 +132,8 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
writer.close(); writer.close();
} }
} }
} catch (IOException e) { } catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); throw new IndexShardGatewayRecoveryException(shardId(), "failed to fetch index version after copying it over", e);
} }
recoveryStatus.index().updateVersion(version); recoveryStatus.index().updateVersion(version);
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());

View File

@ -233,7 +233,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
} finally { } finally {
output.close(); output.close();
} }
} }
for (StoreFileMetaData metaData : files.values()) { for (StoreFileMetaData metaData : files.values()) {
if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) { if (metaData.name().startsWith(CHECKSUMS_PREFIX) && !checksumName.equals(metaData.name())) {
@ -334,21 +334,25 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
@Override @Override
public void copy(Directory to, String src, String dest, IOContext context) throws IOException { public void copy(Directory to, String src, String dest, IOContext context) throws IOException {
ensureOpen();
// lets the default implementation happen, so we properly open an input and create an output // lets the default implementation happen, so we properly open an input and create an output
super.copy(to, src, dest, context); super.copy(to, src, dest, context);
} }
@Override @Override
public String[] listAll() throws IOException { public String[] listAll() throws IOException {
ensureOpen();
return files; return files;
} }
@Override @Override
public boolean fileExists(String name) throws IOException { public boolean fileExists(String name) throws IOException {
ensureOpen();
return filesMetadata.containsKey(name); return filesMetadata.containsKey(name);
} }
public void deleteFileChecksum(String name) throws IOException { public void deleteFileChecksum(String name) throws IOException {
ensureOpen();
StoreFileMetaData metaData = filesMetadata.get(name); StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData != null) { if (metaData != null) {
try { try {
@ -367,6 +371,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
@Override @Override
public void deleteFile(String name) throws IOException { public void deleteFile(String name) throws IOException {
ensureOpen();
// we don't allow to delete the checksums files, only using the deleteChecksum method // we don't allow to delete the checksums files, only using the deleteChecksum method
if (isChecksum(name)) { if (isChecksum(name)) {
return; return;
@ -393,6 +398,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
*/ */
@Override @Override
public long fileLength(String name) throws IOException { public long fileLength(String name) throws IOException {
ensureOpen();
StoreFileMetaData metaData = filesMetadata.get(name); StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) { if (metaData == null) {
throw new FileNotFoundException(name); throw new FileNotFoundException(name);
@ -410,6 +416,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
} }
public IndexOutput createOutput(String name, IOContext context, boolean raw) throws IOException { public IndexOutput createOutput(String name, IOContext context, boolean raw) throws IOException {
ensureOpen();
Directory directory; Directory directory;
if (isChecksum(name)) { if (isChecksum(name)) {
directory = distributor.primary(); directory = distributor.primary();
@ -433,7 +440,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
if (computeChecksum) { if (computeChecksum) {
out = new BufferedChecksumIndexOutput(out, new Adler32()); out = new BufferedChecksumIndexOutput(out, new Adler32());
} }
final StoreIndexOutput storeIndexOutput = new StoreIndexOutput(metaData, out, name); final StoreIndexOutput storeIndexOutput = new StoreIndexOutput(metaData, out, name);
success = true; success = true;
return storeIndexOutput; return storeIndexOutput;
@ -447,6 +454,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
@Override @Override
public IndexInput openInput(String name, IOContext context) throws IOException { public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
StoreFileMetaData metaData = filesMetadata.get(name); StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) { if (metaData == null) {
throw new FileNotFoundException(name); throw new FileNotFoundException(name);
@ -472,6 +480,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
@Override @Override
public IndexInputSlicer createSlicer(String name, IOContext context) throws IOException { public IndexInputSlicer createSlicer(String name, IOContext context) throws IOException {
ensureOpen();
StoreFileMetaData metaData = filesMetadata.get(name); StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) { if (metaData == null) {
throw new FileNotFoundException(name); throw new FileNotFoundException(name);
@ -486,7 +495,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
} }
@Override @Override
public void close() throws IOException { public synchronized void close() throws IOException {
isOpen = false;
for (Directory delegate : distributor.all()) { for (Directory delegate : distributor.all()) {
delegate.close(); delegate.close();
} }
@ -523,6 +533,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
@Override @Override
public void sync(Collection<String> names) throws IOException { public void sync(Collection<String> names) throws IOException {
ensureOpen();
if (sync) { if (sync) {
Map<Directory, Collection<String>> map = Maps.newHashMap(); Map<Directory, Collection<String>> map = Maps.newHashMap();
for (String name : names) { for (String name : names) {