Fix several places where resources are not closed

This commit adds a test that checks that we are closing
all files even if there are random IOExceptions happening. The test
found several places where due to IOExceptions streams were not
closed due to unsufficient exception handling.
This commit is contained in:
Simon Willnauer 2013-09-12 23:24:27 +02:00
parent 439413c626
commit 507b6a6e8c
5 changed files with 122 additions and 25 deletions

View File

@ -56,8 +56,12 @@ public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
@Override
public void close() throws IOException {
super.close();
out.close();
try {
super.close();
} finally {
out.close();
}
}
@Override

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.Compressor;
@ -416,21 +417,31 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
directory = distributor.any();
}
IndexOutput out = directory.createOutput(name, context);
synchronized (mutex) {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, null, directory);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
boolean computeChecksum = !raw;
if (computeChecksum) {
// don't compute checksum for segment based files
if ("segments.gen".equals(name) || name.startsWith("segments")) {
computeChecksum = false;
boolean success = false;
try {
synchronized (mutex) {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, null, directory);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
boolean computeChecksum = !raw;
if (computeChecksum) {
// don't compute checksum for segment based files
if ("segments.gen".equals(name) || name.startsWith("segments")) {
computeChecksum = false;
}
}
if (computeChecksum) {
out = new BufferedChecksumIndexOutput(out, new Adler32());
}
final StoreIndexOutput storeIndexOutput = new StoreIndexOutput(metaData, out, name);
success = true;
return storeIndexOutput;
}
if (computeChecksum) {
out = new BufferedChecksumIndexOutput(out, new Adler32());
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(out);
}
return new StoreIndexOutput(metaData, out, name);
}
}
@ -441,11 +452,19 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
throw new FileNotFoundException(name);
}
IndexInput in = metaData.directory().openInput(name, context);
// Only for backward comp. since we now use Lucene codec compression
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
in = compressor.indexInput(in);
boolean success = false;
try {
// Only for backward comp. since we now use Lucene codec compression
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
in = compressor.indexInput(in);
}
}
success = true;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(in);
}
}
return in;

View File

@ -392,7 +392,7 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase {
}
}
if (forceRefresh) {
client().admin().indices().prepareRefresh(index).execute().get();
assertNoFailures(client().admin().indices().prepareRefresh(index).execute().get());
}
}

View File

@ -42,11 +42,11 @@ import java.util.Random;
import java.util.Set;
public class MockDirectoryHelper {
public static final String RANDOM_IO_EXCEPTION_RATE = "store.mock.random.io_exception_rate";
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "store.mock.random.io_exception_rate_on_open";
public static final String RANDOM_SEED = "store.mock.random.seed";
public static final String RANDOM_THROTTLE = "store.mock.random.throttle";
public static final String CHECK_INDEX_ON_CLOSE = "store.mock.check_index_on_close";
public static final String RANDOM_IO_EXCEPTION_RATE = "index.store.mock.random.io_exception_rate";
public static final String RANDOM_IO_EXCEPTION_RATE_ON_OPEN = "index.store.mock.random.io_exception_rate_on_open";
public static final String RANDOM_SEED = "index.store.mock.random.seed";
public static final String RANDOM_THROTTLE = "index.store.mock.random.throttle";
public static final String CHECK_INDEX_ON_CLOSE = "index.store.mock.check_index_on_close";
public static final Set<MockDirectoryWrapper> wrappers = ConcurrentCollections.newConcurrentSet();
private final Random random;
private final double randomIOExceptionRate;

View File

@ -0,0 +1,74 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.basic;
import org.apache.lucene.util.English;
import org.elasticsearch.AbstractSharedClusterTest;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.store.mock.MockDirectoryHelper;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
public class SearchWithRandomExceptionsTests extends AbstractSharedClusterTest {
@Test
public void testRandomExceptions() throws IOException, InterruptedException, ExecutionException {
final int numShards = between(1, 5);
String mapping = XContentFactory.jsonBuilder().
startObject().
startObject("type").
startObject("properties").
startObject("test").field("type", "string").endObject().
endObject().
endObject()
.endObject().string();
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("index.number_of_shards", numShards)
.put("index.number_of_replicas", randomIntBetween(0, 1))
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE, randomBoolean() ? 1.0/between(10, 100) : 0.0)
.put(MockDirectoryHelper.RANDOM_IO_EXCEPTION_RATE_ON_OPEN, randomBoolean() ? 1.0/between(10, 100) : 0.0)
.put(MockDirectoryHelper.CHECK_INDEX_ON_CLOSE, true))
.addMapping("type", mapping).execute().actionGet();
client().admin().cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout(TimeValue.timeValueMillis(100))).get(); // it's ok to timeout here
int numDocs = between(10, 100);
for (int i = 0; i < numDocs ; i++) {
try {
client().prepareIndex("test", "type", "" + i).setSource("test", English.intToEnglish(i)).get();
} catch (ElasticSearchException ex) {
}
}
client().admin().indices().prepareRefresh("test").execute().get(); // don't assert on failures here
int numSearches = atLeast(10);
// we don't check anything here really just making sure we don't leave any open files or a broken index behind.
for (int i = 0; i < numSearches; i++) {
client().prepareSearch().setQuery(QueryBuilders.matchQuery("test", English.intToEnglish(between(0, numDocs)))).get();
}
}
}