diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 77181e8d6a7..d658b07de4a 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,7 +58,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { private final NodeSettingsService service = new NodeSettingsService(Settings.EMPTY); public void testSendFiles() throws Throwable { - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). + put("indices.recovery.concurrent_small_file_streams", 1).build(); + final RecoverySettings recoverySettings = new RecoverySettings(settings, service); StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), @@ -82,7 +85,13 @@ public class RecoverySourceHandlerTests extends ESTestCase { Store targetStore = newStore(createTempDir()); handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { try { - return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)); + return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { + @Override + public void close() throws IOException { + super.close(); + store.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it + } + }; } catch (IOException e) { throw new RuntimeException(e); } @@ -98,7 +107,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { } public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). + put("indices.recovery.concurrent_small_file_streams", 1).build(); + final RecoverySettings recoverySettings = new RecoverySettings(settings, service); StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), @@ -138,7 +149,13 @@ public class RecoverySourceHandlerTests extends ESTestCase { try { handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> { try { - return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)); + return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { + @Override + public void close() throws IOException { + super.close(); + store.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it + } + }; } catch (IOException e) { throw new RuntimeException(e); } @@ -153,7 +170,9 @@ public class RecoverySourceHandlerTests extends ESTestCase { public void testHandleExceptinoOnSendSendFiles() throws Throwable { - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). + put("indices.recovery.concurrent_small_file_streams", 1).build(); + final RecoverySettings recoverySettings = new RecoverySettings(settings, service); StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),