Merge pull request #15012 from s1monw/issues/14866

Prevent writing to closed channel if translog is already closed
This commit is contained in:
Simon Willnauer 2015-11-26 14:47:38 +01:00
commit e363e0c2aa
4 changed files with 109 additions and 40 deletions

View File

@ -47,6 +47,7 @@ public final class BufferingTranslogWriter extends TranslogWriter {
@Override @Override
public Translog.Location add(BytesReference data) throws IOException { public Translog.Location add(BytesReference data) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
operationCounter++; operationCounter++;
final long offset = totalOffset; final long offset = totalOffset;
if (data.length() >= buffer.length) { if (data.length() >= buffer.length) {
@ -106,6 +107,8 @@ public final class BufferingTranslogWriter extends TranslogWriter {
return; return;
} }
synchronized (this) { synchronized (this) {
channelReference.incRef();
try {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
flush(); flush();
lastSyncedOffset = totalOffset; lastSyncedOffset = totalOffset;
@ -113,12 +116,16 @@ public final class BufferingTranslogWriter extends TranslogWriter {
// we can do this outside of the write lock but we have to protect from // we can do this outside of the write lock but we have to protect from
// concurrent syncs // concurrent syncs
checkpoint(lastSyncedOffset, operationCounter, channelReference); checkpoint(lastSyncedOffset, operationCounter, channelReference);
} finally {
channelReference.decRef();
}
} }
} }
public void updateBufferSize(int bufferSize) { public void updateBufferSize(int bufferSize) {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (this.buffer.length != bufferSize) { if (this.buffer.length != bufferSize) {
flush(); flush();
this.buffer = new byte[bufferSize]; this.buffer = new byte[bufferSize];

View File

@ -407,6 +407,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
out.seek(end); out.seek(end);
final ReleasablePagedBytesReference bytes = out.bytes(); final ReleasablePagedBytesReference bytes = out.bytes();
try (ReleasableLock lock = readLock.acquire()) { try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Location location = current.add(bytes); Location location = current.add(bytes);
if (config.isSyncOnEachOperation()) { if (config.isSyncOnEachOperation()) {
current.sync(); current.sync();
@ -414,6 +415,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
assert current.assertBytesAtLocation(location, bytes); assert current.assertBytesAtLocation(location, bytes);
return location; return location;
} }
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Throwable e) { } catch (Throwable e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally { } finally {
@ -1285,8 +1288,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override @Override
public void prepareCommit() throws IOException { public void prepareCommit() throws IOException {
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingTranslog != null) { if (currentCommittingTranslog != null) {
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
} }
@ -1318,9 +1321,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override @Override
public void commit() throws IOException { public void commit() throws IOException {
ensureOpen();
ImmutableTranslogReader toClose = null; ImmutableTranslogReader toClose = null;
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
if (currentCommittingTranslog == null) { if (currentCommittingTranslog == null) {
prepareCommit(); prepareCommit();
} }

View File

@ -123,9 +123,9 @@ public class TranslogWriter extends TranslogReader {
* add the given bytes to the translog and return the location they were written at * add the given bytes to the translog and return the location they were written at
*/ */
public Translog.Location add(BytesReference data) throws IOException { public Translog.Location add(BytesReference data) throws IOException {
ensureOpen();
final long position; final long position;
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
position = writtenOffset; position = writtenOffset;
data.writeTo(channel); data.writeTo(channel);
writtenOffset = writtenOffset + data.length(); writtenOffset = writtenOffset + data.length();
@ -200,9 +200,9 @@ public class TranslogWriter extends TranslogReader {
* returns a new immutable reader which only exposes the current written operation * * returns a new immutable reader which only exposes the current written operation *
*/ */
public ImmutableTranslogReader immutableReader() throws TranslogException { public ImmutableTranslogReader immutableReader() throws TranslogException {
ensureOpen();
if (channelReference.tryIncRef()) { if (channelReference.tryIncRef()) {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
flush(); flush();
ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter); ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter);
channelReference.incRef(); // for new reader channelReference.incRef(); // for new reader

View File

@ -460,36 +460,7 @@ public class TranslogTests extends ESTestCase {
final CountDownLatch downLatch = new CountDownLatch(1); final CountDownLatch downLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
final int threadId = i; final int threadId = i;
threads[i] = new Thread(new Runnable() { threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
@Override
public void run() {
try {
downLatch.await();
for (int opCount = 0; opCount < opsPerThread; opCount++) {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
default:
throw new ElasticsearchException("not supported op type");
}
Translog.Location loc = translog.add(op);
writtenOperations.add(new LocationOperation(op, loc));
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}
});
threads[i].setDaemon(true); threads[i].setDaemon(true);
threads[i].start(); threads[i].start();
} }
@ -1220,4 +1191,92 @@ public class TranslogTests extends ESTestCase {
assertNull(snapshot.next()); assertNull(snapshot.next());
} }
} }
public void testFailOnClosedWrite() throws IOException {
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
translog.close();
try {
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
fail("closed");
} catch (AlreadyClosedException ex) {
// all is welll
}
}
public void testCloseConcurrently() throws Throwable {
final int opsPerThread = randomIntBetween(10, 200);
int threadCount = 2 + randomInt(5);
logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread);
final BlockingQueue<LocationOperation> writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread);
Thread[] threads = new Thread[threadCount];
final Throwable[] threadExceptions = new Throwable[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
threads[i].setDaemon(true);
threads[i].start();
}
downLatch.countDown();
translog.close();
for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null) {
if ((threadExceptions[i] instanceof AlreadyClosedException) == false) {
throw threadExceptions[i];
}
}
threads[i].join(60 * 1000);
}
}
private static class TranslogThread extends Thread {
private final CountDownLatch downLatch;
private final int opsPerThread;
private final int threadId;
private final BlockingQueue<LocationOperation> writtenOperations;
private final Throwable[] threadExceptions;
private final Translog translog;
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
this.translog = translog;
this.downLatch = downLatch;
this.opsPerThread = opsPerThread;
this.threadId = threadId;
this.writtenOperations = writtenOperations;
this.threadExceptions = threadExceptions;
}
@Override
public void run() {
try {
downLatch.await();
for (int opCount = 0; opCount < opsPerThread; opCount++) {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
default:
throw new ElasticsearchException("not supported op type");
}
Translog.Location loc = translog.add(op);
writtenOperations.add(new LocationOperation(op, loc));
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}
}
} }