Move ensureOpen calls under lock
We used to check on several places if we are still open but non of these places did the check under the lock which leaves a small window where we potentially get closed but still access an already closed channel or another IO resource.
This commit is contained in:
parent
9c1f930b8c
commit
3dfa146632
|
@ -46,8 +46,8 @@ public final class BufferingTranslogWriter extends TranslogWriter {
|
|||
|
||||
@Override
|
||||
public Translog.Location add(BytesReference data) throws IOException {
|
||||
ensureOpen();
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
operationCounter++;
|
||||
final long offset = totalOffset;
|
||||
if (data.length() >= buffer.length) {
|
||||
|
@ -107,19 +107,25 @@ public final class BufferingTranslogWriter extends TranslogWriter {
|
|||
return;
|
||||
}
|
||||
synchronized (this) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
flush();
|
||||
lastSyncedOffset = totalOffset;
|
||||
channelReference.incRef();
|
||||
try {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
flush();
|
||||
lastSyncedOffset = totalOffset;
|
||||
}
|
||||
// we can do this outside of the write lock but we have to protect from
|
||||
// concurrent syncs
|
||||
checkpoint(lastSyncedOffset, operationCounter, channelReference);
|
||||
} finally {
|
||||
channelReference.decRef();
|
||||
}
|
||||
// we can do this outside of the write lock but we have to protect from
|
||||
// concurrent syncs
|
||||
checkpoint(lastSyncedOffset, operationCounter, channelReference);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void updateBufferSize(int bufferSize) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
if (this.buffer.length != bufferSize) {
|
||||
flush();
|
||||
this.buffer = new byte[bufferSize];
|
||||
|
|
|
@ -1288,8 +1288,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
@Override
|
||||
public void prepareCommit() throws IOException {
|
||||
ensureOpen();
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
if (currentCommittingTranslog != null) {
|
||||
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
|
||||
}
|
||||
|
@ -1321,9 +1321,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
@Override
|
||||
public void commit() throws IOException {
|
||||
ensureOpen();
|
||||
ImmutableTranslogReader toClose = null;
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
if (currentCommittingTranslog == null) {
|
||||
prepareCommit();
|
||||
}
|
||||
|
|
|
@ -123,9 +123,9 @@ public class TranslogWriter extends TranslogReader {
|
|||
* add the given bytes to the translog and return the location they were written at
|
||||
*/
|
||||
public Translog.Location add(BytesReference data) throws IOException {
|
||||
ensureOpen();
|
||||
final long position;
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
position = writtenOffset;
|
||||
data.writeTo(channel);
|
||||
writtenOffset = writtenOffset + data.length();
|
||||
|
@ -200,9 +200,9 @@ public class TranslogWriter extends TranslogReader {
|
|||
* returns a new immutable reader which only exposes the current written operation *
|
||||
*/
|
||||
public ImmutableTranslogReader immutableReader() throws TranslogException {
|
||||
ensureOpen();
|
||||
if (channelReference.tryIncRef()) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
flush();
|
||||
ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter);
|
||||
channelReference.incRef(); // for new reader
|
||||
|
|
Loading…
Reference in New Issue