From aa54e9cc35ca01d38cd62b215e539c5f083ac34d Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 24 Feb 2010 00:32:23 +0200 Subject: [PATCH] better handling of open inputs/outputs in case of recovery failure --- .../index/shard/recovery/RecoveryAction.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java index b0061ca5ff9..e3df6b81d08 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryAction.java @@ -226,10 +226,11 @@ public class RecoveryAction extends AbstractIndexShardComponent { for (final String name : snapshot.getFiles()) { threadPool.execute(new Runnable() { @Override public void run() { + IndexInput indexInput = null; try { final int BUFFER_SIZE = (int) fileChunkSize.bytes(); byte[] buf = new byte[BUFFER_SIZE]; - IndexInput indexInput = store.directory().openInput(name); + indexInput = store.directory().openInput(name); long len = indexInput.length(); long readCount = 0; while (readCount < len) { @@ -243,6 +244,13 @@ public class RecoveryAction extends AbstractIndexShardComponent { } catch (Exception e) { lastException.set(e); } finally { + if (indexInput != null) { + try { + indexInput.close(); + } catch (IOException e) { + // ignore + } + } latch.countDown(); } } @@ -424,11 +432,20 @@ public class RecoveryAction extends AbstractIndexShardComponent { indexOutput = openIndexOutputs.get(request.name); } synchronized (indexOutput) { - indexOutput.writeBytes(request.content, request.content.length); - if (indexOutput.getFilePointer() == request.length) { - // we are done - indexOutput.close(); + try { + indexOutput.writeBytes(request.content, request.content.length); + if (indexOutput.getFilePointer() == request.length) { + // we are done + indexOutput.close(); + openIndexOutputs.remove(request.name); + } + } catch (IOException e) { openIndexOutputs.remove(request.name); + try { + indexOutput.close(); + } catch (IOException e1) { + // ignore + } } } channel.sendResponse(VoidStreamable.INSTANCE);