Prevent writing to closed channel if translog is already closed
We handle AlreadyClosedExceptions gracefully wherever IndexShard / Engine is used. In some cases, instead of throwing the appropriate exception we bubble up ChannelClosedException instead which causes shard failures etc. Today, it seems like that this can only happen if the engine is closed without acquireing the lock which means that the shard has failed already so the impact is really just a confusing log message. Yet, this change enforces throwing the right exception if the translog is already closed. Closes #14866
This commit is contained in:
parent
e2447c7ce0
commit
9c1f930b8c
|
@ -46,6 +46,7 @@ public final class BufferingTranslogWriter extends TranslogWriter {
|
|||
|
||||
@Override
|
||||
public Translog.Location add(BytesReference data) throws IOException {
|
||||
ensureOpen();
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
operationCounter++;
|
||||
final long offset = totalOffset;
|
||||
|
|
|
@ -407,6 +407,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
out.seek(end);
|
||||
final ReleasablePagedBytesReference bytes = out.bytes();
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Location location = current.add(bytes);
|
||||
if (config.isSyncOnEachOperation()) {
|
||||
current.sync();
|
||||
|
@ -414,6 +415,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
assert current.assertBytesAtLocation(location, bytes);
|
||||
return location;
|
||||
}
|
||||
} catch (AlreadyClosedException ex) {
|
||||
throw ex;
|
||||
} catch (Throwable e) {
|
||||
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
|
||||
} finally {
|
||||
|
|
|
@ -460,36 +460,7 @@ public class TranslogTests extends ESTestCase {
|
|||
final CountDownLatch downLatch = new CountDownLatch(1);
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
final int threadId = i;
|
||||
threads[i] = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
downLatch.await();
|
||||
for (int opCount = 0; opCount < opsPerThread; opCount++) {
|
||||
Translog.Operation op;
|
||||
switch (randomFrom(Translog.Operation.Type.values())) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
op = new Translog.Index("test", threadId + "_" + opCount,
|
||||
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
|
||||
break;
|
||||
case DELETE:
|
||||
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
|
||||
1 + randomInt(100000),
|
||||
randomFrom(VersionType.values()));
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchException("not supported op type");
|
||||
}
|
||||
|
||||
Translog.Location loc = translog.add(op);
|
||||
writtenOperations.add(new LocationOperation(op, loc));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
threadExceptions[threadId] = t;
|
||||
}
|
||||
}
|
||||
});
|
||||
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
|
||||
threads[i].setDaemon(true);
|
||||
threads[i].start();
|
||||
}
|
||||
|
@ -1220,4 +1191,92 @@ public class TranslogTests extends ESTestCase {
|
|||
assertNull(snapshot.next());
|
||||
}
|
||||
}
|
||||
|
||||
public void testFailOnClosedWrite() throws IOException {
|
||||
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
||||
translog.close();
|
||||
try {
|
||||
translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
||||
fail("closed");
|
||||
} catch (AlreadyClosedException ex) {
|
||||
// all is welll
|
||||
}
|
||||
}
|
||||
|
||||
public void testCloseConcurrently() throws Throwable {
|
||||
final int opsPerThread = randomIntBetween(10, 200);
|
||||
int threadCount = 2 + randomInt(5);
|
||||
|
||||
logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread);
|
||||
final BlockingQueue<LocationOperation> writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread);
|
||||
|
||||
Thread[] threads = new Thread[threadCount];
|
||||
final Throwable[] threadExceptions = new Throwable[threadCount];
|
||||
final CountDownLatch downLatch = new CountDownLatch(1);
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
final int threadId = i;
|
||||
threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions);
|
||||
threads[i].setDaemon(true);
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
downLatch.countDown();
|
||||
translog.close();
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
if (threadExceptions[i] != null) {
|
||||
if ((threadExceptions[i] instanceof AlreadyClosedException) == false) {
|
||||
throw threadExceptions[i];
|
||||
}
|
||||
}
|
||||
threads[i].join(60 * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TranslogThread extends Thread {
|
||||
private final CountDownLatch downLatch;
|
||||
private final int opsPerThread;
|
||||
private final int threadId;
|
||||
private final BlockingQueue<LocationOperation> writtenOperations;
|
||||
private final Throwable[] threadExceptions;
|
||||
private final Translog translog;
|
||||
|
||||
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
|
||||
this.translog = translog;
|
||||
this.downLatch = downLatch;
|
||||
this.opsPerThread = opsPerThread;
|
||||
this.threadId = threadId;
|
||||
this.writtenOperations = writtenOperations;
|
||||
this.threadExceptions = threadExceptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
downLatch.await();
|
||||
for (int opCount = 0; opCount < opsPerThread; opCount++) {
|
||||
Translog.Operation op;
|
||||
switch (randomFrom(Translog.Operation.Type.values())) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
op = new Translog.Index("test", threadId + "_" + opCount,
|
||||
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
|
||||
break;
|
||||
case DELETE:
|
||||
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
|
||||
1 + randomInt(100000),
|
||||
randomFrom(VersionType.values()));
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchException("not supported op type");
|
||||
}
|
||||
|
||||
Translog.Location loc = translog.add(op);
|
||||
writtenOperations.add(new LocationOperation(op, loc));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
threadExceptions[threadId] = t;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue