diff --git a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java index 0e84c73f47a..6026468973a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java @@ -47,6 +47,7 @@ public final class BufferingTranslogWriter extends TranslogWriter { @Override public Translog.Location add(BytesReference data) throws IOException { try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); operationCounter++; final long offset = totalOffset; if (data.length() >= buffer.length) { @@ -106,19 +107,25 @@ public final class BufferingTranslogWriter extends TranslogWriter { return; } synchronized (this) { - try (ReleasableLock lock = writeLock.acquire()) { - flush(); - lastSyncedOffset = totalOffset; + channelReference.incRef(); + try { + try (ReleasableLock lock = writeLock.acquire()) { + flush(); + lastSyncedOffset = totalOffset; + } + // we can do this outside of the write lock but we have to protect from + // concurrent syncs + checkpoint(lastSyncedOffset, operationCounter, channelReference); + } finally { + channelReference.decRef(); } - // we can do this outside of the write lock but we have to protect from - // concurrent syncs - checkpoint(lastSyncedOffset, operationCounter, channelReference); } } public void updateBufferSize(int bufferSize) { try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); if (this.buffer.length != bufferSize) { flush(); this.buffer = new byte[bufferSize]; diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index d2e3bb8d248..e6ff9344acd 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -407,6 +407,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC out.seek(end); final ReleasablePagedBytesReference bytes = out.bytes(); try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); Location location = current.add(bytes); if (config.isSyncOnEachOperation()) { current.sync(); @@ -414,6 +415,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC assert current.assertBytesAtLocation(location, bytes); return location; } + } catch (AlreadyClosedException ex) { + throw ex; } catch (Throwable e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { @@ -1285,8 +1288,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public void prepareCommit() throws IOException { - ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); if (currentCommittingTranslog != null) { throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); } @@ -1318,9 +1321,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public void commit() throws IOException { - ensureOpen(); ImmutableTranslogReader toClose = null; try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); if (currentCommittingTranslog == null) { prepareCommit(); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 2290dd69d87..045550cb628 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -123,9 +123,9 @@ public class TranslogWriter extends TranslogReader { * add the given bytes to the translog and return the location they were written at */ public Translog.Location add(BytesReference data) throws IOException { - ensureOpen(); final long position; try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); position = writtenOffset; data.writeTo(channel); writtenOffset = writtenOffset + data.length(); @@ -200,9 +200,9 @@ public class TranslogWriter extends TranslogReader { * returns a new immutable reader which only exposes the current written operation * */ public ImmutableTranslogReader immutableReader() throws TranslogException { - ensureOpen(); if (channelReference.tryIncRef()) { try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); flush(); ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter); channelReference.incRef(); // for new reader diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e76e09a6cfb..26faa02a17d 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -460,36 +460,7 @@ public class TranslogTests extends ESTestCase { final CountDownLatch downLatch = new CountDownLatch(1); for (int i = 0; i < threadCount; i++) { final int threadId = i; - threads[i] = new Thread(new Runnable() { - @Override - public void run() { - try { - downLatch.await(); - for (int opCount = 0; opCount < opsPerThread; opCount++) { - Translog.Operation op; - switch (randomFrom(Translog.Operation.Type.values())) { - case CREATE: - case INDEX: - op = new Translog.Index("test", threadId + "_" + opCount, - randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); - break; - case DELETE: - op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount), - 1 + randomInt(100000), - randomFrom(VersionType.values())); - break; - default: - throw new ElasticsearchException("not supported op type"); - } - - Translog.Location loc = translog.add(op); - writtenOperations.add(new LocationOperation(op, loc)); - } - } catch (Throwable t) { - threadExceptions[threadId] = t; - } - } - }); + threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions); threads[i].setDaemon(true); threads[i].start(); } @@ -1220,4 +1191,92 @@ public class TranslogTests extends ESTestCase { assertNull(snapshot.next()); } } + + public void testFailOnClosedWrite() throws IOException { + translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + translog.close(); + try { + translog.add(new Translog.Index("test", "1", Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + fail("closed"); + } catch (AlreadyClosedException ex) { + // all is welll + } + } + + public void testCloseConcurrently() throws Throwable { + final int opsPerThread = randomIntBetween(10, 200); + int threadCount = 2 + randomInt(5); + + logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread); + final BlockingQueue writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread); + + Thread[] threads = new Thread[threadCount]; + final Throwable[] threadExceptions = new Throwable[threadCount]; + final CountDownLatch downLatch = new CountDownLatch(1); + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, threadExceptions); + threads[i].setDaemon(true); + threads[i].start(); + } + + downLatch.countDown(); + translog.close(); + + for (int i = 0; i < threadCount; i++) { + if (threadExceptions[i] != null) { + if ((threadExceptions[i] instanceof AlreadyClosedException) == false) { + throw threadExceptions[i]; + } + } + threads[i].join(60 * 1000); + } + } + + private static class TranslogThread extends Thread { + private final CountDownLatch downLatch; + private final int opsPerThread; + private final int threadId; + private final BlockingQueue writtenOperations; + private final Throwable[] threadExceptions; + private final Translog translog; + + public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue writtenOperations, Throwable[] threadExceptions) { + this.translog = translog; + this.downLatch = downLatch; + this.opsPerThread = opsPerThread; + this.threadId = threadId; + this.writtenOperations = writtenOperations; + this.threadExceptions = threadExceptions; + } + + @Override + public void run() { + try { + downLatch.await(); + for (int opCount = 0; opCount < opsPerThread; opCount++) { + Translog.Operation op; + switch (randomFrom(Translog.Operation.Type.values())) { + case CREATE: + case INDEX: + op = new Translog.Index("test", threadId + "_" + opCount, + randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); + break; + case DELETE: + op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount), + 1 + randomInt(100000), + randomFrom(VersionType.values())); + break; + default: + throw new ElasticsearchException("not supported op type"); + } + + Translog.Location loc = translog.add(op); + writtenOperations.add(new LocationOperation(op, loc)); + } + } catch (Throwable t) { + threadExceptions[threadId] = t; + } + } + } }