diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 7a4a3e0722d..20333e94ee1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -168,7 +168,7 @@ public class ClusterModule extends AbstractModule { registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME); registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.BYTES_SIZE); + registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY); diff --git a/core/src/main/java/org/elasticsearch/cluster/settings/Validator.java b/core/src/main/java/org/elasticsearch/cluster/settings/Validator.java index 12049abed9b..cb253dceadf 100644 --- a/core/src/main/java/org/elasticsearch/cluster/settings/Validator.java +++ b/core/src/main/java/org/elasticsearch/cluster/settings/Validator.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.settings; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue; @@ -228,6 +229,21 @@ public interface Validator { } }; + Validator POSITIVE_BYTES_SIZE = new Validator() { + @Override + public String validate(String setting, String value, ClusterState state) { + try { + ByteSizeValue byteSizeValue = parseBytesSizeValue(value, setting); + if (byteSizeValue.getBytes() <= 0) { + return setting + " must be a positive byte size value"; + } + } catch (ElasticsearchParseException ex) { + return ex.getMessage(); + } + return null; + } + }; + Validator PERCENTAGE = new Validator() { @Override public String validate(String setting, String value, ClusterState clusterState) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 295ab49ac7f..9d0439dc167 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -293,7 +293,7 @@ public class RecoverySourceHandler { store.incRef(); final StoreFileMetaData md = recoverySourceMetadata.get(name); try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) { - final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes(); + final int BUFFER_SIZE = (int) Math.max(1, recoverySettings.fileChunkSize().bytes()); // at least one! final byte[] buf = new byte[BUFFER_SIZE]; boolean shouldCompressRequest = recoverySettings.compress(); if (CompressorFactory.isCompressed(indexInput)) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java index b4d5bf6471e..cef24543917 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java @@ -226,6 +226,9 @@ public class RecoveryStatus extends AbstractRefCounted { public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException { ensureRefCount(); String tempFileName = getTempNameForFile(fileName); + if (tempFileNames.containsKey(tempFileName)) { + throw new IllegalStateException("output for file [" + fileName + "] has already been created"); + } // add first, before it's created tempFileNames.put(tempFileName, fileName); IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java index 35847f51ab7..af5c2ef8b09 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -56,6 +56,13 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase { assertSame(openIndexOutput, indexOutput); openIndexOutput.writeInt(1); } + try { + status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8), status.store()); + fail("file foo.bar is already opened and registered"); + } catch (IllegalStateException ex) { + assertEquals("output for file [foo.bar] has already been created", ex.getMessage()); + // all well = it's already registered + } status.removeOpenIndexOutputs("foo.bar"); Set strings = Sets.newHashSet(status.store().directory().listAll()); String expectedFile = null; diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index c73baac193e..53a71e1dc7b 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -402,7 +402,7 @@ public class RelocationIT extends ESIntegTestCase { // Slow down recovery in order to make recovery cancellations more likely IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexName).get(); - long chunkSize = statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10; + long chunkSize = Math.max(1, statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10); assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() // one chunk per sec..