Make chunkSize configurabel for tests and use correct close handling for closing streams to not hide original exception

This commit is contained in:
Simon Willnauer 2015-12-11 09:44:08 +01:00
parent 7d6663e6e5
commit 12f905a675
5 changed files with 54 additions and 45 deletions

View File

@ -20,6 +20,8 @@
package org.elasticsearch.common.io;
import java.nio.charset.StandardCharsets;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.util.Callback;
import java.io.BufferedReader;
@ -68,6 +70,7 @@ public abstract class Streams {
public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
boolean success = false;
try {
long byteCount = 0;
int bytesRead;
@ -76,17 +79,13 @@ public abstract class Streams {
byteCount += bytesRead;
}
out.flush();
success = true;
return byteCount;
} finally {
try {
in.close();
} catch (IOException ex) {
// do nothing
}
try {
out.close();
} catch (IOException ex) {
// do nothing
if (success) {
IOUtils.close(in, out);
} else {
IOUtils.closeWhileHandlingException(in, out);
}
}
}
@ -130,6 +129,7 @@ public abstract class Streams {
public static int copy(Reader in, Writer out) throws IOException {
Objects.requireNonNull(in, "No Reader specified");
Objects.requireNonNull(out, "No Writer specified");
boolean success = false;
try {
int byteCount = 0;
char[] buffer = new char[BUFFER_SIZE];
@ -139,17 +139,13 @@ public abstract class Streams {
byteCount += bytesRead;
}
out.flush();
success = true;
return byteCount;
} finally {
try {
in.close();
} catch (IOException ex) {
// do nothing
}
try {
out.close();
} catch (IOException ex) {
// do nothing
if (success) {
IOUtils.close(in, out);
} else {
IOUtils.closeWhileHandlingException(in, out);
}
}
}

View File

@ -71,6 +71,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes();
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
private volatile int concurrentStreams;
private volatile int concurrentSmallFileStreams;
private final ThreadPoolExecutor concurrentStreamPool;
@ -84,6 +86,8 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionLongTimeout;
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
@Inject
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
@ -160,6 +164,15 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
return internalActionLongTimeout;
}
public ByteSizeValue getChunkSize() { return chunkSize; }
void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
this.chunkSize = chunkSize;
}
class ApplySettings implements NodeSettingsService.Listener {
@Override

View File

@ -72,7 +72,6 @@ import java.util.stream.StreamSupport;
*/
public class RecoverySourceHandler {
private static final int CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB).bytesAsInt();
protected final ESLogger logger;
// Shard that is going to be recovered (the "source")
private final IndexShard shard;
@ -82,6 +81,7 @@ public class RecoverySourceHandler {
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
private final int chunkSizeInBytes;
protected final RecoveryResponse response;
@ -110,7 +110,7 @@ public class RecoverySourceHandler {
this.transportService = transportService;
this.indexName = this.request.shardId().index().name();
this.shardId = this.request.shardId().id();
this.chunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
this.response = new RecoveryResponse();
}
@ -247,7 +247,7 @@ public class RecoverySourceHandler {
});
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), CHUNK_SIZE);
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
cancellableThreads.execute(() -> {
// Send the CLEAN_FILES request, which takes all of the files that
@ -451,7 +451,7 @@ public class RecoverySourceHandler {
// Check if this request is past bytes threshold, and
// if so, send it off
if (size >= CHUNK_SIZE) {
if (size >= chunkSizeInBytes) {
// don't throttle translog, since we lock for phase3 indexing,
// so we need to move it as fast as possible. Note, since we
@ -526,7 +526,6 @@ public class RecoverySourceHandler {
private final AtomicLong bytesSinceLastPause;
private final Translog.View translogView;
private long position = 0;
private final AtomicBoolean failed = new AtomicBoolean(false);
RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) {
this.md = md;
@ -541,24 +540,9 @@ public class RecoverySourceHandler {
@Override
public final void write(byte[] b, int offset, int length) throws IOException {
if (failed.get() == false) {
/* since we are an outputstream a wrapper might get flushed on close after we threw an exception.
* that might cause another exception from the other side of the recovery since we are in a bad state
* due to a corrupted file stream etc. the biggest issue is that we will turn into a loop of exceptions
* and we will always suppress the original one which might cause the recovery to retry over and over again.
* To prevent this we try to not send chunks again after we failed once.*/
boolean success = false;
try {
sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length);
position += length;
assert md.length() >= position : "length: " + md.length() + " but positions was: " + position;
success = true;
} finally {
if (success == false) {
failed.compareAndSet(false, true);
}
}
}
sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length);
position += length;
assert md.length() >= position : "length: " + md.length() + " but positions was: " + position;
}
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
@ -689,9 +673,10 @@ public class RecoverySourceHandler {
pool = recoverySettings.concurrentSmallFileStreamPool();
}
Future<Void> future = pool.submit(() -> {
try (final OutputStream outputStream = outputStreamFactory.apply(md);
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream);
try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
// it's fine that we are only having the indexInput int he try/with block. The copy methods handles
// exceptions during close correctly and doesn't hide the original exception.
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
}
return null;
});

View File

@ -138,7 +138,10 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = shardSize.bytes() / 5;
long chunkSize = Math.max(1, shardSize.bytes() / 10);
for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) {
setChunkSize(settings, new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES));
}
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
// one chunk per sec..
@ -148,6 +151,9 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
private void restoreRecoverySpeed() {
for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) {
setChunkSize(settings, RecoverySettings.DEFAULT_CHUNK_SIZE);
}
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb")
@ -629,4 +635,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
transport.sendRequest(node, requestId, action, request, options);
}
}
public static void setChunkSize(RecoverySettings recoverySettings, ByteSizeValue chunksSize) {
recoverySettings.setChunkSize(chunksSize);
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.IndexRecoveryIT;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryTarget;
@ -71,6 +72,10 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
* Later we allow full recovery to ensure we can still recover and don't run into corruptions.
*/
public void testCancelRecoveryAndResume() throws Exception {
for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) {
IndexRecoveryIT.setChunkSize(settings, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES));
}
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {