mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 21:38:15 +00:00
Endless recovery loop with indices.recovery.file_chunk_size=0Bytes
This is caused by sending the same file to the chunk handler with offset `0` which in-turn opens a new outputstream and waits for bytes. But the next round will send 0 bytes again with offset 0. This commit adds some checks / validators that those settings are positive byte values and fixes the RecoveryStatus to throw an IAE if the same file is opened twice.
This commit is contained in:
parent
7765b0497d
commit
59f390f5d0
@ -168,7 +168,7 @@ public class ClusterModule extends AbstractModule {
|
||||
registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
|
||||
registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME);
|
||||
registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.BYTES_SIZE);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE);
|
||||
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY);
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.settings;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue;
|
||||
@ -228,6 +229,21 @@ public interface Validator {
|
||||
}
|
||||
};
|
||||
|
||||
Validator POSITIVE_BYTES_SIZE = new Validator() {
|
||||
@Override
|
||||
public String validate(String setting, String value, ClusterState state) {
|
||||
try {
|
||||
ByteSizeValue byteSizeValue = parseBytesSizeValue(value, setting);
|
||||
if (byteSizeValue.getBytes() <= 0) {
|
||||
return setting + " must be a positive byte size value";
|
||||
}
|
||||
} catch (ElasticsearchParseException ex) {
|
||||
return ex.getMessage();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
Validator PERCENTAGE = new Validator() {
|
||||
@Override
|
||||
public String validate(String setting, String value, ClusterState clusterState) {
|
||||
|
@ -293,7 +293,7 @@ public class RecoverySourceHandler {
|
||||
store.incRef();
|
||||
final StoreFileMetaData md = recoverySourceMetadata.get(name);
|
||||
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
|
||||
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
|
||||
final int BUFFER_SIZE = (int) Math.max(1, recoverySettings.fileChunkSize().bytes()); // at least one!
|
||||
final byte[] buf = new byte[BUFFER_SIZE];
|
||||
boolean shouldCompressRequest = recoverySettings.compress();
|
||||
if (CompressorFactory.isCompressed(indexInput)) {
|
||||
|
@ -226,6 +226,9 @@ public class RecoveryStatus extends AbstractRefCounted {
|
||||
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
|
||||
ensureRefCount();
|
||||
String tempFileName = getTempNameForFile(fileName);
|
||||
if (tempFileNames.containsKey(tempFileName)) {
|
||||
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
|
||||
}
|
||||
// add first, before it's created
|
||||
tempFileNames.put(tempFileName, fileName);
|
||||
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
|
||||
|
@ -56,6 +56,13 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
|
||||
assertSame(openIndexOutput, indexOutput);
|
||||
openIndexOutput.writeInt(1);
|
||||
}
|
||||
try {
|
||||
status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8), status.store());
|
||||
fail("file foo.bar is already opened and registered");
|
||||
} catch (IllegalStateException ex) {
|
||||
assertEquals("output for file [foo.bar] has already been created", ex.getMessage());
|
||||
// all well = it's already registered
|
||||
}
|
||||
status.removeOpenIndexOutputs("foo.bar");
|
||||
Set<String> strings = Sets.newHashSet(status.store().directory().listAll());
|
||||
String expectedFile = null;
|
||||
|
@ -402,7 +402,7 @@ public class RelocationIT extends ESIntegTestCase {
|
||||
|
||||
// Slow down recovery in order to make recovery cancellations more likely
|
||||
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexName).get();
|
||||
long chunkSize = statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10;
|
||||
long chunkSize = Math.max(1, statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10);
|
||||
assertTrue(client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder()
|
||||
// one chunk per sec..
|
||||
|
Loading…
x
Reference in New Issue
Block a user