LUCENE-7792: add try/finally to make sure semaphore is released on exceptions

This commit is contained in:
Mike McCandless 2017-04-26 09:36:14 -04:00
parent f45017b2d4
commit 25f1dd2f9b
1 changed files with 71 additions and 57 deletions

View File

@ -273,6 +273,9 @@ public class OfflineSorter {
while (true) {
Partition part = readPartition(is);
if (part.count == 0) {
if (partitionsInRAM != null) {
partitionsInRAM.release();
}
assert part.exhausted;
break;
}
@ -317,16 +320,7 @@ public class OfflineSorter {
result = out.getName();
}
} else {
try {
result = segments.get(0).get().fileName;
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
} catch (ExecutionException ee) {
IOUtils.reThrow(ee.getCause());
// dead code but javac disagrees:
result = null;
}
result = getPartition(segments.get(0)).fileName;
}
// We should be explicitly removing all intermediate files ourselves unless there is an exception:
@ -406,52 +400,59 @@ public class OfflineSorter {
if (partitionsInRAM != null) {
partitionsInRAM.acquire();
}
long start = System.currentTimeMillis();
SortableBytesRefArray buffer;
boolean exhausted = false;
int count;
if (valueLength != -1) {
// fixed length case
buffer = new FixedLengthBytesRefArray(valueLength);
int limit = ramBufferSize.bytes / valueLength;
for(int i=0;i<limit;i++) {
BytesRef item = null;
try {
item = reader.next();
} catch (Throwable t) {
verifyChecksum(t, reader);
boolean success = false;
try {
long start = System.currentTimeMillis();
SortableBytesRefArray buffer;
boolean exhausted = false;
int count;
if (valueLength != -1) {
// fixed length case
buffer = new FixedLengthBytesRefArray(valueLength);
int limit = ramBufferSize.bytes / valueLength;
for(int i=0;i<limit;i++) {
BytesRef item = null;
try {
item = reader.next();
} catch (Throwable t) {
verifyChecksum(t, reader);
}
if (item == null) {
exhausted = true;
break;
}
buffer.append(item);
}
if (item == null) {
exhausted = true;
break;
} else {
Counter bufferBytesUsed = Counter.newCounter();
buffer = new BytesRefArray(bufferBytesUsed);
while (true) {
BytesRef item = null;
try {
item = reader.next();
} catch (Throwable t) {
verifyChecksum(t, reader);
}
if (item == null) {
exhausted = true;
break;
}
buffer.append(item);
// Account for the created objects.
// (buffer slots do not account to buffer size.)
if (bufferBytesUsed.get() > ramBufferSize.bytes) {
break;
}
}
buffer.append(item);
}
} else {
Counter bufferBytesUsed = Counter.newCounter();
buffer = new BytesRefArray(bufferBytesUsed);
while (true) {
BytesRef item = null;
try {
item = reader.next();
} catch (Throwable t) {
verifyChecksum(t, reader);
}
if (item == null) {
exhausted = true;
break;
}
buffer.append(item);
// Account for the created objects.
// (buffer slots do not account to buffer size.)
if (bufferBytesUsed.get() > ramBufferSize.bytes) {
break;
}
sortInfo.readTimeMS += System.currentTimeMillis() - start;
success = true;
return new Partition(buffer, exhausted);
} finally {
if (success == false && partitionsInRAM != null) {
partitionsInRAM.release();
}
}
sortInfo.readTimeMS += System.currentTimeMillis() - start;
return new Partition(buffer, exhausted);
}
static class FileAndTop {
@ -616,15 +617,28 @@ public class OfflineSorter {
CodecUtil.writeFooter(out.out);
part.buffer.clear();
return new Partition(tempFile.getName(), part.count);
} finally {
if (partitionsInRAM != null) {
partitionsInRAM.release();
}
return new Partition(tempFile.getName(), part.count);
}
}
}
private Partition getPartition(Future<Partition> future) throws IOException {
try {
return future.get();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
} catch (ExecutionException ee) {
IOUtils.reThrow(ee.getCause());
// oh so soon to go away:
return null;
}
}
/** Merges multiple file-based partitions to a single on-disk partition. */
private class MergePartitionsTask implements Callable<Partition> {
private final Directory dir;
@ -636,10 +650,10 @@ public class OfflineSorter {
}
@Override
public Partition call() throws IOException, InterruptedException, ExecutionException {
public Partition call() throws IOException {
long totalCount = 0;
for (Future<Partition> segment : segmentsToMerge) {
totalCount += segment.get().count;
totalCount += getPartition(segment).count;
}
PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
@ -660,7 +674,7 @@ public class OfflineSorter {
// Open streams and read the top for each file
for (int i = 0; i < segmentsToMerge.size(); i++) {
Partition segment = segmentsToMerge.get(i).get();
Partition segment = getPartition(segmentsToMerge.get(i));
streams[i] = getReader(dir.openChecksumInput(segment.fileName, IOContext.READONCE), segment.fileName);
BytesRef item = null;
@ -705,7 +719,7 @@ public class OfflineSorter {
}
List<String> toDelete = new ArrayList<>();
for (Future<Partition> segment : segmentsToMerge) {
toDelete.add(segment.get().fileName);
toDelete.add(getPartition(segment).fileName);
}
IOUtils.deleteFiles(dir, toDelete);