better handling of open inputs/outputs in case of recovery failure

This commit is contained in:
kimchy 2010-02-24 00:32:23 +02:00
parent d1a4989e84
commit aa54e9cc35
1 changed files with 22 additions and 5 deletions

View File

@ -226,10 +226,11 @@ public class RecoveryAction extends AbstractIndexShardComponent {
for (final String name : snapshot.getFiles()) { for (final String name : snapshot.getFiles()) {
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
IndexInput indexInput = null;
try { try {
final int BUFFER_SIZE = (int) fileChunkSize.bytes(); final int BUFFER_SIZE = (int) fileChunkSize.bytes();
byte[] buf = new byte[BUFFER_SIZE]; byte[] buf = new byte[BUFFER_SIZE];
IndexInput indexInput = store.directory().openInput(name); indexInput = store.directory().openInput(name);
long len = indexInput.length(); long len = indexInput.length();
long readCount = 0; long readCount = 0;
while (readCount < len) { while (readCount < len) {
@ -243,6 +244,13 @@ public class RecoveryAction extends AbstractIndexShardComponent {
} catch (Exception e) { } catch (Exception e) {
lastException.set(e); lastException.set(e);
} finally { } finally {
if (indexInput != null) {
try {
indexInput.close();
} catch (IOException e) {
// ignore
}
}
latch.countDown(); latch.countDown();
} }
} }
@ -424,12 +432,21 @@ public class RecoveryAction extends AbstractIndexShardComponent {
indexOutput = openIndexOutputs.get(request.name); indexOutput = openIndexOutputs.get(request.name);
} }
synchronized (indexOutput) { synchronized (indexOutput) {
try {
indexOutput.writeBytes(request.content, request.content.length); indexOutput.writeBytes(request.content, request.content.length);
if (indexOutput.getFilePointer() == request.length) { if (indexOutput.getFilePointer() == request.length) {
// we are done // we are done
indexOutput.close(); indexOutput.close();
openIndexOutputs.remove(request.name); openIndexOutputs.remove(request.name);
} }
} catch (IOException e) {
openIndexOutputs.remove(request.name);
try {
indexOutput.close();
} catch (IOException e1) {
// ignore
}
}
} }
channel.sendResponse(VoidStreamable.INSTANCE); channel.sendResponse(VoidStreamable.INSTANCE);
} }