diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 74cac49b76d..d9ee2f4177a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -231,7 +231,8 @@ public class InternalEngine extends Engine { protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogGeneration translogGeneration) throws IOException { int opsRecovered = 0; final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer(); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try { + Translog.Snapshot snapshot = translog.newSnapshot(); Translog.Operation operation; while ((operation = snapshot.next()) != null) { try { diff --git a/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java new file mode 100644 index 00000000000..c98ea69f87f --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -0,0 +1,136 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; + +/** + * A base class for all classes that allows reading ops from translog files + */ +public abstract class BaseTranslogReader implements Comparable { + + protected final long generation; + protected final FileChannel channel; + protected final Path path; + protected final long firstOperationOffset; + + public BaseTranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset) { + assert Translog.parseIdFromFileName(path) == generation : "generation missmatch. Path: " + Translog.parseIdFromFileName(path) + " but generation: " + generation; + + this.generation = generation; + this.path = path; + this.channel = channel; + this.firstOperationOffset = firstOperationOffset; + } + + public long getGeneration() { + return this.generation; + } + + public abstract long sizeInBytes(); + + abstract public int totalOperations(); + + public final long getFirstOperationOffset() { + return firstOperationOffset; + } + + public Translog.Operation read(Translog.Location location) throws IOException { + assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]"; + ByteBuffer buffer = ByteBuffer.allocate(location.size); + try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) { + return read(checksumStreamInput); + } + } + + /** read the size of the op (i.e., number of bytes, including the op size) written at the given position */ + protected final int readSize(ByteBuffer reusableBuffer, long position) { + // read op size from disk + assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]"; + try { + reusableBuffer.clear(); + reusableBuffer.limit(4); + readBytes(reusableBuffer, position); + reusableBuffer.flip(); + // Add an extra 4 to account for the operation size integer itself + final int size = reusableBuffer.getInt() + 4; + final long maxSize = sizeInBytes() - position; + if (size < 0 || size > maxSize) { + throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size); + } + + return size; + } catch (IOException e) { + throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.path, e); + } + } + + public Translog.Snapshot newSnapshot() { + return new TranslogSnapshot(generation, channel, path, firstOperationOffset, sizeInBytes(), totalOperations()); + } + + /** + * reads an operation at the given position and returns it. The buffer length is equal to the number + * of bytes reads. + */ + protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException { + final ByteBuffer buffer; + if (reusableBuffer.capacity() >= opSize) { + buffer = reusableBuffer; + } else { + buffer = ByteBuffer.allocate(opSize); + } + buffer.clear(); + buffer.limit(opSize); + readBytes(buffer, position); + buffer.flip(); + return new BufferedChecksumStreamInput(new ByteBufferStreamInput(buffer), reuse); + } + + protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException { + return Translog.readOperation(inStream); + } + + /** + * reads bytes at position into the given buffer, filling it. + */ + abstract protected void readBytes(ByteBuffer buffer, long position) throws IOException; + + @Override + public String toString() { + return "translog [" + generation + "][" + path + "]"; + } + + @Override + public int compareTo(BaseTranslogReader o) { + return Long.compare(getGeneration(), o.getGeneration()); + } + + + public Path path() { + return path; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/translog/ChannelReference.java b/core/src/main/java/org/elasticsearch/index/translog/ChannelReference.java deleted file mode 100644 index b3f60a4c89f..00000000000 --- a/core/src/main/java/org/elasticsearch/index/translog/ChannelReference.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.common.util.Callback; -import org.elasticsearch.common.util.concurrent.AbstractRefCounted; - -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.file.Path; - -final class ChannelReference extends AbstractRefCounted { - private final Path file; - private final FileChannel channel; - protected final long generation; - private final Callback onClose; - - ChannelReference(Path file, long generation, FileChannel channel, Callback onClose) throws IOException { - super(file.toString()); - this.generation = generation; - this.file = file; - this.channel = channel; - this.onClose = onClose; - } - - public long getGeneration() { - return generation; - } - - public Path getPath() { - return this.file; - } - - public FileChannel getChannel() { - return this.channel; - } - - @Override - public String toString() { - return "channel: file [" + file + "], ref count [" + refCount() + "]"; - } - - @Override - protected void closeInternal() { - try { - IOUtils.closeWhileHandlingException(channel); - } finally { - if (onClose != null) { - onClose.handle(this); - } - } - } -} diff --git a/core/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReader.java deleted file mode 100644 index 463c5998f1d..00000000000 --- a/core/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReader.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import java.io.IOException; - -/** - * Version 0 of the translog format, there is no header in this file - */ -@Deprecated -public final class LegacyTranslogReader extends LegacyTranslogReaderBase { - - /** - * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point - * at the end of the last operation in this snapshot. - */ - LegacyTranslogReader(long generation, ChannelReference channelReference, long fileLength) { - super(generation, channelReference, 0, fileLength); - } - - @Override - protected Translog.Operation read(BufferedChecksumStreamInput in) throws IOException { - // read the opsize before an operation. - // Note that this was written & read out side of the stream when this class was used, but it makes things more consistent - // to read this here - in.readInt(); - Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte()); - Translog.Operation operation = Translog.newOperationFromType(type); - operation.readFrom(in); - return operation; - } - - - - @Override - protected ImmutableTranslogReader newReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) { - assert totalOperations == -1 : "expected unknown but was: " + totalOperations; - assert firstOperationOffset == 0; - return new LegacyTranslogReader(generation, channelReference, length); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java b/core/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java deleted file mode 100644 index d9e9e17f792..00000000000 --- a/core/src/main/java/org/elasticsearch/index/translog/LegacyTranslogReaderBase.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * Version 1 of the translog format, there is checkpoint and therefore no notion of op count - */ -@Deprecated -class LegacyTranslogReaderBase extends ImmutableTranslogReader { - - /** - * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point - * at the end of the last operation in this snapshot. - * - */ - LegacyTranslogReaderBase(long generation, ChannelReference channelReference, long firstOperationOffset, long fileLength) { - super(generation, channelReference, firstOperationOffset, fileLength, TranslogReader.UNKNOWN_OP_COUNT); - } - - - @Override - protected Translog.Snapshot newReaderSnapshot(final int totalOperations, ByteBuffer reusableBuffer) { - assert totalOperations == -1 : "legacy we had no idea how many ops: " + totalOperations; - return new ReaderSnapshot(totalOperations, reusableBuffer) { - @Override - public Translog.Operation next() throws IOException { - if (position >= sizeInBytes()) { // this is the legacy case.... - return null; - } - try { - return readOperation(); - } catch (TruncatedTranslogException ex) { - return null; // legacy case - } - } - }; - } - - @Override - protected ImmutableTranslogReader newReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) { - assert totalOperations == -1 : "expected unknown but was: " + totalOperations; - return new LegacyTranslogReaderBase(generation, channelReference, firstOperationOffset, length); - } -} diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index b76214dc2e7..7b1a05e1ac1 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -19,12 +19,8 @@ package org.elasticsearch.index.translog; -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.lease.Releasables; - import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Arrays; /** * A snapshot composed out of multiple snapshots @@ -32,8 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; final class MultiSnapshot implements Translog.Snapshot { private final Translog.Snapshot[] translogs; - private AtomicBoolean closed = new AtomicBoolean(false); - private final int estimatedTotalOperations; + private final int totalOperations; private int index; /** @@ -41,30 +36,18 @@ final class MultiSnapshot implements Translog.Snapshot { */ MultiSnapshot(Translog.Snapshot[] translogs) { this.translogs = translogs; - int ops = 0; - for (Translog.Snapshot translog : translogs) { - - final int tops = translog.estimatedTotalOperations(); - if (tops == TranslogReader.UNKNOWN_OP_COUNT) { - ops = TranslogReader.UNKNOWN_OP_COUNT; - break; - } - assert tops >= 0 : "tops must be positive but was: " + tops; - ops += tops; - } - estimatedTotalOperations = ops; + totalOperations = Arrays.stream(translogs).mapToInt(Translog.Snapshot::totalOperations).sum(); index = 0; } @Override - public int estimatedTotalOperations() { - return estimatedTotalOperations; + public int totalOperations() { + return totalOperations; } @Override public Translog.Operation next() throws IOException { - ensureOpen(); for (; index < translogs.length; index++) { final Translog.Snapshot current = translogs[index]; Translog.Operation op = current.next(); @@ -74,17 +57,4 @@ final class MultiSnapshot implements Translog.Snapshot { } return null; } - - protected void ensureOpen() { - if (closed.get()) { - throw new AlreadyClosedException("snapshot already closed"); - } - } - - @Override - public void close() throws ElasticsearchException { - if (closed.compareAndSet(false, true)) { - Releasables.close(translogs); - } - } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 0001fda0752..b2e81de044b 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -34,12 +34,9 @@ import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -53,7 +50,6 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -69,6 +65,8 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. @@ -112,29 +110,25 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); - private final List recoveredTranslogs; + // the list of translog readers is guaranteed to be in order of translog generation + private final List readers = new ArrayList<>(); private volatile ScheduledFuture syncScheduler; // this is a concurrent set and is not protected by any of the locks. The main reason - // is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed) + // is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed) private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); private BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; private final Path location; private TranslogWriter current; - private volatile ImmutableTranslogReader currentCommittingTranslog; - private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion. + + private final static long NOT_SET_GENERATION = -1; // -1 is safe as it will not cause a translog deletion. + + private volatile long currentCommittingGeneration = NOT_SET_GENERATION; + private volatile long lastCommittedTranslogFileGeneration = NOT_SET_GENERATION; private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final String translogUUID; - private Callback onViewClose = new Callback() { - @Override - public void handle(View view) { - logger.trace("closing view starting at translog [{}]", view.minTranslogGeneration()); - boolean removed = outstandingViews.remove(view); - assert removed : "View was never set but was supposed to be removed"; - } - }; /** @@ -176,11 +170,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // if not we don't even try to clean it up and wait until we fail creating it assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; if (Files.exists(currentCheckpointFile) // current checkpoint is already copied - && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning + && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName()); } - this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint); - if (recoveredTranslogs.isEmpty()) { + this.readers.addAll(recoverFromFiles(translogGeneration, checkpoint)); + if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); } boolean success = false; @@ -193,11 +187,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // for instance if we have a lot of tlog and we can't create the writer we keep on holding // on to all the uncommitted tlog files if we don't close if (success == false) { - IOUtils.closeWhileHandlingException(recoveredTranslogs); + IOUtils.closeWhileHandlingException(readers); } } } else { - this.recoveredTranslogs = Collections.emptyList(); IOUtils.rm(location); logger.debug("wipe translog location - creating new translog"); Files.createDirectories(location); @@ -205,21 +198,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC Checkpoint checkpoint = new Checkpoint(0, 0, generation); Checkpoint.write(location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); current = createWriter(generation); - this.lastCommittedTranslogFileGeneration = -1; // playing safe + this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION; } // now that we know which files are there, create a new current one. } catch (Throwable t) { // close the opened translog files if we fail to create a new translog... - IOUtils.closeWhileHandlingException(currentCommittingTranslog, current); + IOUtils.closeWhileHandlingException(current); + IOUtils.closeWhileHandlingException(readers); throw t; } } /** recover all translog files found on disk */ - private final ArrayList recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException { + private final ArrayList recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException { boolean success = false; - ArrayList foundTranslogs = new ArrayList<>(); + ArrayList foundTranslogs = new ArrayList<>(); final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work boolean tempFileRenamed = false; try (ReleasableLock lock = writeLock.acquire()) { @@ -230,7 +224,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (Files.exists(committedTranslogFile) == false) { throw new IllegalStateException("translog file doesn't exist with generation: " + i + " lastCommitted: " + lastCommittedTranslogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); } - final ImmutableTranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); + final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); foundTranslogs.add(reader); logger.debug("recovered local translog from checkpoint {}", checkpoint); } @@ -267,17 +261,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return foundTranslogs; } - ImmutableTranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException { - final long generation; - try { - generation = parseIdFromFileName(path); - } catch (IllegalArgumentException ex) { - throw new TranslogException(shardId, "failed to parse generation from file name matching pattern " + path, ex); - } + TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException { FileChannel channel = FileChannel.open(path, StandardOpenOption.READ); try { - final ChannelReference raf = new ChannelReference(path, generation, channel, new OnCloseRunnable()); - ImmutableTranslogReader reader = ImmutableTranslogReader.open(raf, checkpoint, translogUUID); + assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: " + Translog.parseIdFromFileName(path) + " but got: " + checkpoint.generation; + TranslogReader reader = TranslogReader.open(channel, path, checkpoint, translogUUID); channel = null; return reader; } finally { @@ -315,12 +303,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC try { current.sync(); } finally { - try { - IOUtils.close(current, currentCommittingTranslog); - } finally { - IOUtils.close(recoveredTranslogs); - recoveredTranslogs.clear(); - } + closeFilesIfNoPendingViews(); } } finally { FutureUtils.cancel(syncScheduler); @@ -349,41 +332,49 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** * Returns the number of operations in the transaction files that aren't committed to lucene.. - * Note: may return -1 if unknown */ public int totalOperations() { - int ops = 0; - try (ReleasableLock lock = readLock.acquire()) { - ops += current.totalOperations(); - if (currentCommittingTranslog != null) { - int tops = currentCommittingTranslog.totalOperations(); - assert tops != TranslogReader.UNKNOWN_OP_COUNT; - assert tops >= 0; - ops += tops; - } - } - return ops; + return totalOperations(lastCommittedTranslogFileGeneration); } /** * Returns the size in bytes of the translog files that aren't committed to lucene. */ public long sizeInBytes() { - long size = 0; - try (ReleasableLock lock = readLock.acquire()) { - size += current.sizeInBytes(); - if (currentCommittingTranslog != null) { - size += currentCommittingTranslog.sizeInBytes(); - } + return sizeInBytes(lastCommittedTranslogFileGeneration); + } + + /** + * Returns the number of operations in the transaction files that aren't committed to lucene.. + */ + private int totalOperations(long minGeneration) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + return Stream.concat(readers.stream(), Stream.of(current)) + .filter(r -> r.getGeneration() >= minGeneration) + .mapToInt(BaseTranslogReader::totalOperations) + .sum(); + } + } + + /** + * Returns the size in bytes of the translog files that aren't committed to lucene. + */ + private long sizeInBytes(long minGeneration) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + return Stream.concat(readers.stream(), Stream.of(current)) + .filter(r -> r.getGeneration() >= minGeneration) + .mapToLong(BaseTranslogReader::sizeInBytes) + .sum(); } - return size; } TranslogWriter createWriter(long fileGeneration) throws IOException { TranslogWriter newFile; try { - newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), getChannelFactory(), config.getBufferSize()); + newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), getChannelFactory(), config.getBufferSize()); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -398,12 +389,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ public Translog.Operation read(Location location) { try (ReleasableLock lock = readLock.acquire()) { - final TranslogReader reader; + final BaseTranslogReader reader; final long currentGeneration = current.getGeneration(); if (currentGeneration == location.generation) { reader = current; - } else if (currentCommittingTranslog != null && currentCommittingTranslog.getGeneration() == location.generation) { - reader = currentCommittingTranslog; + } else if (readers.isEmpty() == false && readers.get(readers.size() - 1).getGeneration() == location.generation) { + reader = readers.get(readers.size() - 1); } else if (currentGeneration < location.generation) { throw new IllegalStateException("location generation [" + location.generation + "] is greater than the current generation [" + currentGeneration + "]"); } else { @@ -467,33 +458,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * Snapshots are fixed in time and will not be updated with future operations. */ public Snapshot newSnapshot() { - ensureOpen(); - try (ReleasableLock lock = readLock.acquire()) { - ArrayList toOpen = new ArrayList<>(); - toOpen.addAll(recoveredTranslogs); - if (currentCommittingTranslog != null) { - toOpen.add(currentCommittingTranslog); - } - toOpen.add(current); - return createSnapshot(toOpen.toArray(new TranslogReader[toOpen.size()])); - } + return createSnapshot(Long.MIN_VALUE); } - private static Snapshot createSnapshot(TranslogReader... translogs) { - Snapshot[] snapshots = new Snapshot[translogs.length]; - boolean success = false; - try { - for (int i = 0; i < translogs.length; i++) { - snapshots[i] = translogs[i].newSnapshot(); - } - - Snapshot snapshot = new MultiSnapshot(snapshots); - success = true; - return snapshot; - } finally { - if (success == false) { - Releasables.close(snapshots); - } + private Snapshot createSnapshot(long minGeneration) { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + Snapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) + .filter(reader -> reader.getGeneration() >= minGeneration) + .map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new); + return new MultiSnapshot(snapshots); } } @@ -502,25 +476,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * while receiving future ones as well */ public Translog.View newView() { - // we need to acquire the read lock to make sure no new translog is created - // and will be missed by the view we're making try (ReleasableLock lock = readLock.acquire()) { - ArrayList translogs = new ArrayList<>(); - try { - if (currentCommittingTranslog != null) { - translogs.add(currentCommittingTranslog.clone()); - } - translogs.add(current.newReaderFromWriter()); - View view = new View(translogs, onViewClose); - // this is safe as we know that no new translog is being made at the moment - // (we hold a read lock) and the view will be notified of any future one - outstandingViews.add(view); - translogs.clear(); - return view; - } finally { - // close if anything happend and we didn't reach the clear - IOUtils.closeWhileHandlingException(translogs); - } + ensureOpen(); + View view = new View(lastCommittedTranslogFileGeneration); + outstandingViews.add(view); + return view; } } @@ -561,7 +521,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ public boolean ensureSynced(Location location) throws IOException { try (ReleasableLock lock = readLock.acquire()) { - if (location.generation == current.generation) { // if we have a new one it's already synced + if (location.generation == current.getGeneration()) { // if we have a new one it's already synced ensureOpen(); return current.syncUpTo(location.translogLocation + location.size); } @@ -604,151 +564,67 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return config; } - - private final class OnCloseRunnable implements Callback { - @Override - public void handle(ChannelReference channelReference) { - if (isReferencedGeneration(channelReference.getGeneration()) == false) { - Path translogPath = channelReference.getPath(); - assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath; - // if the given translogPath is not the current we can safely delete the file since all references are released - logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); - IOUtils.deleteFilesIgnoringExceptions(translogPath); - IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); - - } - try (DirectoryStream stream = Files.newDirectoryStream(location)) { - for (Path path : stream) { - Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString()); - if (matcher.matches()) { - long generation = Long.parseLong(matcher.group(1)); - if (isReferencedGeneration(generation) == false) { - logger.trace("delete translog file - not referenced and not current anymore {}", path); - IOUtils.deleteFilesIgnoringExceptions(path); - IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); - } - } - } - } catch (IOException e) { - logger.warn("failed to delete unreferenced translog files", e); - } - } - } - /** * a view into the translog, capturing all translog file at the moment of creation * and updated with any future translog. */ - public static final class View implements Closeable { - public static final Translog.View EMPTY_VIEW = new View(Collections.emptyList(), null); + /** + * a view into the translog, capturing all translog file at the moment of creation + * and updated with any future translog. + */ + public class View implements Closeable { - boolean closed; - // last in this list is always FsTranslog.current - final List orderedTranslogs; - private final Callback onClose; + AtomicBoolean closed = new AtomicBoolean(); + final long minGeneration; - View(List orderedTranslogs, Callback onClose) { - // clone so we can safely mutate.. - this.orderedTranslogs = new ArrayList<>(orderedTranslogs); - this.onClose = onClose; - } - - /** - * Called by the parent class when ever the current translog changes - * - * @param oldCurrent a new read only reader for the old current (should replace the previous reference) - * @param newCurrent a reader into the new current. - */ - synchronized void onNewTranslog(TranslogReader oldCurrent, TranslogReader newCurrent) throws IOException { - // even though the close method removes this view from outstandingViews, there is no synchronisation in place - // between that operation and an ongoing addition of a new translog, already having an iterator. - // As such, this method can be called despite of the fact that we are closed. We need to check and ignore. - if (closed) { - // we have to close the new references created for as as we will not hold them - IOUtils.close(oldCurrent, newCurrent); - return; - } - orderedTranslogs.remove(orderedTranslogs.size() - 1).close(); - orderedTranslogs.add(oldCurrent); - orderedTranslogs.add(newCurrent); + View(long minGeneration) { + this.minGeneration = minGeneration; } /** this smallest translog generation in this view */ - public synchronized long minTranslogGeneration() { - ensureOpen(); - return orderedTranslogs.get(0).getGeneration(); + public long minTranslogGeneration() { + return minGeneration; } /** * The total number of operations in the view. */ - public synchronized int totalOperations() { - int ops = 0; - for (TranslogReader translog : orderedTranslogs) { - int tops = translog.totalOperations(); - if (tops == TranslogReader.UNKNOWN_OP_COUNT) { - return -1; - } - assert tops >= 0; - ops += tops; - } - return ops; + public int totalOperations() { + return Translog.this.totalOperations(minGeneration); } /** * Returns the size in bytes of the files behind the view. */ - public synchronized long sizeInBytes() { - long size = 0; - for (TranslogReader translog : orderedTranslogs) { - size += translog.sizeInBytes(); - } - return size; + public long sizeInBytes() { + return Translog.this.sizeInBytes(minGeneration); } /** create a snapshot from this view */ - public synchronized Snapshot snapshot() { + public Snapshot snapshot() { ensureOpen(); - return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); + return Translog.this.createSnapshot(minGeneration); } - void ensureOpen() { - if (closed) { - throw new ElasticsearchException("View is already closed"); + if (closed.get()) { + throw new AlreadyClosedException("View is already closed"); } } @Override - public void close() { - final List toClose = new ArrayList<>(); - try { - synchronized (this) { - if (closed == false) { - try { - if (onClose != null) { - onClose.handle(this); - } - } finally { - closed = true; - toClose.addAll(orderedTranslogs); - orderedTranslogs.clear(); - } - } - } - } finally { - try { - // Close out of lock to prevent deadlocks between channel close which checks for - // references in InternalChannelReference.closeInternal (waiting on a read lock) - // and other FsTranslog#newTranslog calling FsView.onNewTranslog (while having a write lock) - IOUtils.close(toClose); - } catch (Exception e) { - throw new ElasticsearchException("failed to close view", e); - } + public void close() throws IOException { + if (closed.getAndSet(true) == false) { + logger.trace("closing view starting at translog [{}]", minTranslogGeneration()); + boolean removed = outstandingViews.remove(this); + assert removed : "View was never set but was supposed to be removed"; + trimUnreferencedReaders(); + closeFilesIfNoPendingViews(); } } } + public static class Location implements Accountable, Comparable { public final long generation; @@ -817,12 +693,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** * A snapshot of the transaction log, allows to iterate over all the transaction log operations. */ - public interface Snapshot extends Releasable { + public interface Snapshot { /** * The total number of operations in the translog. */ - int estimatedTotalOperations(); + int totalOperations(); /** * Returns the next operation in the snapshot or null if we reached the end. @@ -1320,13 +1196,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public void prepareCommit() throws IOException { try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); - if (currentCommittingTranslog != null) { - throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); + if (currentCommittingGeneration != NOT_SET_GENERATION) { + throw new IllegalStateException("already committing a translog with generation: " + currentCommittingGeneration); } - final TranslogWriter oldCurrent = current; - oldCurrent.ensureOpen(); - oldCurrent.sync(); - currentCommittingTranslog = current.immutableReader(); + currentCommittingGeneration = current.getGeneration(); + TranslogReader currentCommittingTranslog = current.closeIntoReader(); + readers.add(currentCommittingTranslog); Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration(); Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration())); @@ -1335,14 +1210,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC IOUtils.fsync(commitCheckpoint.getParent(), true); // create a new translog file - this will sync it and update the checkpoint data; current = createWriter(current.getGeneration() + 1); - // notify all outstanding views of the new translog (no views are created now as - // we hold a write lock). - for (View view : outstandingViews) { - view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter()); - } - IOUtils.close(oldCurrent); logger.trace("current translog set to [{}]", current.getGeneration()); - assert oldCurrent.syncNeeded() == false : "old translog oldCurrent must not need a sync"; } catch (Throwable t) { IOUtils.closeWhileHandlingException(this); // tragic event @@ -1352,24 +1220,53 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public void commit() throws IOException { - ImmutableTranslogReader toClose = null; try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); - if (currentCommittingTranslog == null) { + if (currentCommittingGeneration == NOT_SET_GENERATION) { prepareCommit(); } + assert currentCommittingGeneration != NOT_SET_GENERATION; + assert readers.stream().filter(r -> r.getGeneration() == currentCommittingGeneration).findFirst().isPresent() + : "reader list doesn't contain committing generation [" + currentCommittingGeneration + "]"; lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up - if (recoveredTranslogs.isEmpty() == false) { - IOUtils.close(recoveredTranslogs); - recoveredTranslogs.clear(); - } - toClose = this.currentCommittingTranslog; - this.currentCommittingTranslog = null; - } finally { - IOUtils.close(toClose); + currentCommittingGeneration = NOT_SET_GENERATION; + trimUnreferencedReaders(); } } + void trimUnreferencedReaders() { + try (ReleasableLock ignored = writeLock.acquire()) { + if (closed.get()) { + // we're shutdown potentially on some tragic event - don't delete anything + return; + } + long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE); + minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); + final long finalMinReferencedGen = minReferencedGen; + List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); + for (final TranslogReader unreferencedReader : unreferenced) { + Path translogPath = unreferencedReader.path(); + logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); + IOUtils.closeWhileHandlingException(unreferencedReader); + IOUtils.deleteFilesIgnoringExceptions(translogPath, + translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); + } + readers.removeAll(unreferenced); + } + } + + void closeFilesIfNoPendingViews() throws IOException { + try (ReleasableLock ignored = writeLock.acquire()) { + if (closed.get() && outstandingViews.isEmpty()) { + logger.trace("closing files. translog is closed and there are no pending views"); + ArrayList toClose = new ArrayList<>(readers); + toClose.add(current); + IOUtils.close(toClose); + } + } + } + + @Override public void rollback() throws IOException { ensureOpen(); @@ -1435,9 +1332,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return TranslogWriter.ChannelFactory.DEFAULT; } - /** If this {@code Translog} was closed as a side-effect of a tragic exception, - * e.g. disk full while flushing a new segment, this returns the root cause exception. - * Otherwise (no tragic exception has occurred) it returns null. */ + /** + * If this {@code Translog} was closed as a side-effect of a tragic exception, + * e.g. disk full while flushing a new segment, this returns the root cause exception. + * Otherwise (no tragic exception has occurred) it returns null. + */ public Throwable getTragicException() { return current.getTragicException(); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 71dff6ec36e..ecc3822361c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -27,161 +27,46 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.FileChannel; -import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.atomic.AtomicBoolean; /** - * A base class for all classes that allows reading ops from translog files + * an immutable translog filereader */ -public abstract class TranslogReader implements Closeable, Comparable { - public static final int UNKNOWN_OP_COUNT = -1; +public class TranslogReader extends BaseTranslogReader implements Closeable { private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f; private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00; - protected final long generation; - protected final ChannelReference channelReference; - protected final FileChannel channel; + + private final int totalOperations; + protected final long length; protected final AtomicBoolean closed = new AtomicBoolean(false); - protected final long firstOperationOffset; - public TranslogReader(long generation, ChannelReference channelReference, long firstOperationOffset) { - this.generation = generation; - this.channelReference = channelReference; - this.channel = channelReference.getChannel(); - this.firstOperationOffset = firstOperationOffset; - } - - public long getGeneration() { - return this.generation; - } - - public abstract long sizeInBytes(); - - abstract public int totalOperations(); - - public final long getFirstOperationOffset() { - return firstOperationOffset; - } - - public Translog.Operation read(Translog.Location location) throws IOException { - assert location.generation == generation : "read location's translog generation [" + location.generation + "] is not [" + generation + "]"; - ByteBuffer buffer = ByteBuffer.allocate(location.size); - try (BufferedChecksumStreamInput checksumStreamInput = checksummedStream(buffer, location.translogLocation, location.size, null)) { - return read(checksumStreamInput); - } - } - - /** read the size of the op (i.e., number of bytes, including the op size) written at the given position */ - private final int readSize(ByteBuffer reusableBuffer, long position) { - // read op size from disk - assert reusableBuffer.capacity() >= 4 : "reusable buffer must have capacity >=4 when reading opSize. got [" + reusableBuffer.capacity() + "]"; - try { - reusableBuffer.clear(); - reusableBuffer.limit(4); - readBytes(reusableBuffer, position); - reusableBuffer.flip(); - // Add an extra 4 to account for the operation size integer itself - final int size = reusableBuffer.getInt() + 4; - final long maxSize = sizeInBytes() - position; - if (size < 0 || size > maxSize) { - throw new TranslogCorruptedException("operation size is corrupted must be [0.." + maxSize + "] but was: " + size); - } - - return size; - } catch (IOException e) { - throw new ElasticsearchException("unexpected exception reading from translog snapshot of " + this.channelReference.getPath(), e); - } - } - - public Translog.Snapshot newSnapshot() { - final ByteBuffer reusableBuffer = ByteBuffer.allocate(1024); - final int totalOperations = totalOperations(); - channelReference.incRef(); - return newReaderSnapshot(totalOperations, reusableBuffer); + /** + * Create a reader of translog file channel. The length parameter should be consistent with totalOperations and point + * at the end of the last operation in this snapshot. + */ + public TranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) { + super(generation, channel, path, firstOperationOffset); + this.length = length; + this.totalOperations = totalOperations; } /** - * reads an operation at the given position and returns it. The buffer length is equal to the number - * of bytes reads. + * Given a file, opens an {@link TranslogReader}, taking of checking and validating the file header. */ - private final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException { - final ByteBuffer buffer; - if (reusableBuffer.capacity() >= opSize) { - buffer = reusableBuffer; - } else { - buffer = ByteBuffer.allocate(opSize); - } - buffer.clear(); - buffer.limit(opSize); - readBytes(buffer, position); - buffer.flip(); - return new BufferedChecksumStreamInput(new ByteBufferStreamInput(buffer), reuse); - } - - protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException { - return Translog.readOperation(inStream); - } - - /** - * reads bytes at position into the given buffer, filling it. - */ - abstract protected void readBytes(ByteBuffer buffer, long position) throws IOException; - - @Override - public final void close() throws IOException { - if (closed.compareAndSet(false, true)) { - channelReference.decRef(); - } - } - - protected final boolean isClosed() { - return closed.get(); - } - - protected void ensureOpen() { - if (isClosed()) { - throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed"); - } - } - - @Override - public String toString() { - return "translog [" + generation + "][" + channelReference.getPath() + "]"; - } - - @Override - public int compareTo(TranslogReader o) { - return Long.compare(getGeneration(), o.getGeneration()); - } - - - /** - * Given a file, return a VersionedTranslogStream based on an - * optionally-existing header in the file. If the file does not exist, or - * has zero length, returns the latest version. If the header does not - * exist, assumes Version 0 of the translog file format. - */ - public static ImmutableTranslogReader open(ChannelReference channelReference, Checkpoint checkpoint, String translogUUID) throws IOException { - final FileChannel channel = channelReference.getChannel(); - final Path path = channelReference.getPath(); - assert channelReference.getGeneration() == checkpoint.generation : "expected generation: " + channelReference.getGeneration() + " but got: " + checkpoint.generation; + public static TranslogReader open(FileChannel channel, Path path, Checkpoint checkpoint, String translogUUID) throws IOException { try { - if (checkpoint.offset == 0 && checkpoint.numOps == TranslogReader.UNKNOWN_OP_COUNT) { // only old files can be empty - return new LegacyTranslogReader(channelReference.getGeneration(), channelReference, 0); - } - - InputStreamStreamInput headerStream = new InputStreamStreamInput(Channels.newInputStream(channel)); // don't close + InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel)); // don't close // Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the // header, in binary this looks like: // @@ -208,20 +93,17 @@ public abstract class TranslogReader implements Closeable, Comparable TranslogReader.UNKNOWN_OP_COUNT: "expected at least 0 operatin but got: " + checkpoint.numOps; + assert checkpoint.numOps >= 0 : "expected at least 0 operatin but got: " + checkpoint.numOps; assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; int len = headerStream.readInt(); if (len > channel.size()) { @@ -232,78 +114,61 @@ public abstract class TranslogReader implements Closeable, Comparable= length) { + throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]"); } - - @Override - public final int estimatedTotalOperations() { - return totalOperations; + if (position < firstOperationOffset) { + throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + firstOperationOffset + "]"); } + Channels.readFromFileChannelWithEofException(channel, position, buffer); + } - @Override - public Translog.Operation next() throws IOException { - if (readOperations < totalOperations) { - assert readOperations < totalOperations : "readOpeartions must be less than totalOperations"; - return readOperation(); - } else { - return null; - } + public Checkpoint getInfo() { + return new Checkpoint(length, totalOperations, getGeneration()); + } + + @Override + public final void close() throws IOException { + if (closed.compareAndSet(false, true)) { + channel.close(); } + } - protected final Translog.Operation readOperation() throws IOException { - final int opSize = readSize(reusableBuffer, position); - reuse = checksummedStream(reusableBuffer, position, opSize, reuse); - Translog.Operation op = read(reuse); - position += opSize; - readOperations++; - return op; - } + protected final boolean isClosed() { + return closed.get(); + } - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - channelReference.decRef(); - } + protected void ensureOpen() { + if (isClosed()) { + throw new AlreadyClosedException(toString() + " is already closed"); } } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java similarity index 50% rename from core/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java rename to core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 1d6d3b45a63..10f381f8eba 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/ImmutableTranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.elasticsearch.index.translog; import org.elasticsearch.common.io.Channels; @@ -24,68 +23,82 @@ import org.elasticsearch.common.io.Channels; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; -/** - * a translog reader which is fixed in length - */ -public class ImmutableTranslogReader extends TranslogReader { - +public class TranslogSnapshot extends BaseTranslogReader implements Translog.Snapshot { private final int totalOperations; protected final long length; + private final ByteBuffer reusableBuffer; + private long position; + private int readOperations; + private BufferedChecksumStreamInput reuse; + + /** * Create a snapshot of translog file channel. The length parameter should be consistent with totalOperations and point * at the end of the last operation in this snapshot. */ - public ImmutableTranslogReader(long generation, ChannelReference channelReference, long firstOperationOffset, long length, int totalOperations) { - super(generation, channelReference, firstOperationOffset); + public TranslogSnapshot(long generation, FileChannel channel, Path path, long firstOperationOffset, long length, int totalOperations) { + super(generation, channel, path, firstOperationOffset); this.length = length; this.totalOperations = totalOperations; + this.reusableBuffer = ByteBuffer.allocate(1024); + readOperations = 0; + position = firstOperationOffset; + reuse = null; } @Override - public final TranslogReader clone() { - if (channelReference.tryIncRef()) { - try { - ImmutableTranslogReader reader = newReader(generation, channelReference, firstOperationOffset, length, totalOperations); - channelReference.incRef(); // for the new object - return reader; - } finally { - channelReference.decRef(); - } + public final int totalOperations() { + return totalOperations; + } + + @Override + public Translog.Operation next() throws IOException { + if (readOperations < totalOperations) { + return readOperation(); } else { - throw new IllegalStateException("can't increment translog [" + generation + "] channel ref count"); + return null; } } - - protected ImmutableTranslogReader newReader(long generation, ChannelReference channelReference, long offset, long length, int totalOperations) { - return new ImmutableTranslogReader(generation, channelReference, offset, length, totalOperations); + protected final Translog.Operation readOperation() throws IOException { + final int opSize = readSize(reusableBuffer, position); + reuse = checksummedStream(reusableBuffer, position, opSize, reuse); + Translog.Operation op = read(reuse); + position += opSize; + readOperations++; + return op; } + public long sizeInBytes() { return length; } - public int totalOperations() { - return totalOperations; - } - /** * reads an operation at the given position into the given buffer. */ protected void readBytes(ByteBuffer buffer, long position) throws IOException { if (position >= length) { - throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]"); + throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "], generation: [" + getGeneration() + "], path: [" + path + "]"); } - if (position < firstOperationOffset) { - throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + firstOperationOffset + "]"); + if (position < getFirstOperationOffset()) { + throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + getFirstOperationOffset() + "], generation: [" + getGeneration() + "], path: [" + path + "]"); } Channels.readFromFileChannelWithEofException(channel, position, buffer); } - public Checkpoint getInfo() { - return new Checkpoint(length, totalOperations, getGeneration()); + @Override + public String toString() { + return "TranslogSnapshot{" + + "readOperations=" + readOperations + + ", position=" + position + + ", totalOperations=" + totalOperations + + ", length=" + length + + ", reusableBuffer=" + reusableBuffer + + '}'; } - -} +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 517e4a5b30e..f7d0cd571e8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -25,24 +25,23 @@ import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.Callback; import org.elasticsearch.index.shard.ShardId; import java.io.BufferedOutputStream; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.nio.file.Files; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.concurrent.atomic.AtomicBoolean; -public class TranslogWriter extends TranslogReader { +public class TranslogWriter extends BaseTranslogReader implements Closeable { public static final String TRANSLOG_CODEC = "translog"; public static final int VERSION_CHECKSUMS = 1; @@ -61,11 +60,14 @@ public class TranslogWriter extends TranslogReader { /* the total offset of this file including the bytes written to the file as well as into the buffer */ private volatile long totalOffset; - public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException { - super(generation, channelReference, channelReference.getChannel().position()); + protected final AtomicBoolean closed = new AtomicBoolean(false); + + + public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException { + super(generation, channel, path, channel.position()); this.shardId = shardId; - this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channelReference.getChannel()), bufferSize.bytesAsInt()); - this.lastSyncedOffset = channelReference.getChannel().position(); + this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); + this.lastSyncedOffset = channel.position(); totalOffset = lastSyncedOffset; } @@ -74,10 +76,10 @@ public class TranslogWriter extends TranslogReader { } private static int getHeaderLength(int uuidLength) { - return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT; + return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT; } - public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); @@ -90,7 +92,7 @@ public class TranslogWriter extends TranslogReader { out.writeBytes(ref.bytes, ref.offset, ref.length); channel.force(true); writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE); - final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize); + final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, channel, file, bufferSize); return writer; } catch (Throwable throwable) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that @@ -99,9 +101,12 @@ public class TranslogWriter extends TranslogReader { throw throwable; } } - /** If this {@code TranslogWriter} was closed as a side-effect of a tragic exception, - * e.g. disk full while flushing a new segment, this returns the root cause exception. - * Otherwise (no tragic exception has occurred) it returns null. */ + + /** + * If this {@code TranslogWriter} was closed as a side-effect of a tragic exception, + * e.g. disk full while flushing a new segment, this returns the root cause exception. + * Otherwise (no tragic exception has occurred) it returns null. + */ public Throwable getTragicException() { return tragedy; } @@ -110,7 +115,9 @@ public class TranslogWriter extends TranslogReader { assert throwable != null : "throwable must not be null in a tragic event"; if (tragedy == null) { tragedy = throwable; - } else { + } else if (tragedy != throwable) { + // it should be safe to call closeWithTragicEvents on multiple layers without + // worrying about self suppression. tragedy.addSuppressed(throwable); } close(); @@ -134,29 +141,27 @@ public class TranslogWriter extends TranslogReader { } /** - * write all buffered ops to disk and fsync file + * write all buffered ops to disk and fsync file. + * + * Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before + * raising the exception. */ public void sync() throws IOException { if (syncNeeded()) { synchronized (this) { - ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event - channelReference.incRef(); + ensureOpen(); + final long offsetToSync; + final int opsCounter; try { - final long offsetToSync; - final int opsCounter; outputStream.flush(); offsetToSync = totalOffset; opsCounter = operationCounter; - try { - checkpoint(offsetToSync, opsCounter, channelReference); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - lastSyncedOffset = offsetToSync; - } finally { - channelReference.decRef(); + checkpoint(offsetToSync, opsCounter, generation, channel, path); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; } + lastSyncedOffset = offsetToSync; } } } @@ -177,76 +182,36 @@ public class TranslogWriter extends TranslogReader { } /** - * returns a new reader that follows the current writes (most importantly allows making - * repeated snapshots that includes new content) + * closes this writer and transfers it's underlying file channel to a new immutable reader */ - public TranslogReader newReaderFromWriter() { - ensureOpen(); - channelReference.incRef(); - boolean success = false; + public synchronized TranslogReader closeIntoReader() throws IOException { try { - final TranslogReader reader = new InnerReader(this.generation, firstOperationOffset, channelReference); - success = true; - return reader; - } finally { - if (!success) { - channelReference.decRef(); - } + sync(); // sync before we close.. + } catch (IOException e) { + closeWithTragicEvent(e); + throw e; } - } - - /** - * returns a new immutable reader which only exposes the current written operation * - */ - public ImmutableTranslogReader immutableReader() throws TranslogException { - if (channelReference.tryIncRef()) { - synchronized (this) { - try { - ensureOpen(); - outputStream.flush(); - ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, getWrittenOffset(), operationCounter); - channelReference.incRef(); // for new reader - return reader; - } catch (Exception e) { - throw new TranslogException(shardId, "exception while creating an immutable reader", e); - } finally { - channelReference.decRef(); - } - } + if (closed.compareAndSet(false, true)) { + return new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter); } else { - throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count"); + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); } } + @Override + public synchronized Translog.Snapshot newSnapshot() { + ensureOpen(); + try { + sync(); + } catch (IOException e) { + throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e); + } + return super.newSnapshot(); + } + private long getWrittenOffset() throws IOException { - return channelReference.getChannel().position(); - } - - /** - * this class is used when one wants a reference to this file which exposes all recently written operation. - * as such it needs access to the internals of the current reader - */ - final class InnerReader extends TranslogReader { - - public InnerReader(long generation, long fistOperationOffset, ChannelReference channelReference) { - super(generation, channelReference, fistOperationOffset); - } - - @Override - public long sizeInBytes() { - return TranslogWriter.this.sizeInBytes(); - } - - @Override - public int totalOperations() { - return TranslogWriter.this.totalOperations(); - } - - @Override - protected void readBytes(ByteBuffer buffer, long position) throws IOException { - TranslogWriter.this.readBytes(buffer, position); - } + return channel.position(); } /** @@ -264,13 +229,13 @@ public class TranslogWriter extends TranslogReader { @Override protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException { - if (position+targetBuffer.remaining() > getWrittenOffset()) { + if (position + targetBuffer.remaining() > getWrittenOffset()) { synchronized (this) { // we only flush here if it's really really needed - try to minimize the impact of the read operation // in some cases ie. a tragic event we might still be able to read the relevant value // which is not really important in production but some test can make most strict assumptions // if we don't fail in this call unless absolutely necessary. - if (position+targetBuffer.remaining() > getWrittenOffset()) { + if (position + targetBuffer.remaining() > getWrittenOffset()) { outputStream.flush(); } } @@ -280,9 +245,9 @@ public class TranslogWriter extends TranslogReader { Channels.readFromFileChannelWithEofException(channel, position, targetBuffer); } - private synchronized void checkpoint(long lastSyncPosition, int operationCounter, ChannelReference channelReference) throws IOException { - channelReference.getChannel().force(false); - writeCheckpoint(lastSyncPosition, operationCounter, channelReference.getPath().getParent(), channelReference.getGeneration(), StandardOpenOption.WRITE); + private synchronized void checkpoint(long lastSyncPosition, int operationCounter, long generation, FileChannel translogFileChannel, Path translogFilePath) throws IOException { + translogFileChannel.force(false); + writeCheckpoint(lastSyncPosition, operationCounter, translogFilePath.getParent(), generation, StandardOpenOption.WRITE); } private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException { @@ -307,6 +272,17 @@ public class TranslogWriter extends TranslogReader { } } + @Override + public final void close() throws IOException { + if (closed.compareAndSet(false, true)) { + channel.close(); + } + } + + protected final boolean isClosed() { + return closed.get(); + } + private final class BufferedChannelOutputStream extends BufferedOutputStream { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index c0270e71721..1ef9215b7b4 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -82,7 +83,7 @@ public class RecoverySource extends AbstractComponent implements IndexEventListe } } - private RecoveryResponse recover(final StartRecoveryRequest request) { + private RecoveryResponse recover(final StartRecoveryRequest request) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(request.shardId().index().name()); final IndexShard shard = indexService.getShard(request.shardId().id()); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 94c78efccd8..4699e8d5ace 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -120,7 +120,7 @@ public class RecoverySourceHandler { /** * performs the recovery from the local engine to the target */ - public RecoveryResponse recoverToTarget() { + public RecoveryResponse recoverToTarget() throws IOException { try (Translog.View translogView = shard.acquireTranslogView()) { logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration()); final IndexCommit phase1Snapshot; @@ -144,8 +144,8 @@ public class RecoverySourceHandler { } logger.trace("snapshot translog for recovery. current size is [{}]", translogView.totalOperations()); - try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) { - phase2(phase2Snapshot); + try { + phase2(translogView.snapshot()); } catch (Throwable e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -308,7 +308,7 @@ public class RecoverySourceHandler { }); } - prepareTargetForTranslog(translogView); + prepareTargetForTranslog(translogView.totalOperations()); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime()); response.phase1Time = stopWatch.totalTime().millis(); @@ -320,8 +320,7 @@ public class RecoverySourceHandler { } - - protected void prepareTargetForTranslog(final Translog.View translogView) { + protected void prepareTargetForTranslog(final int totalTranslogOps) { StopWatch stopWatch = new StopWatch().start(); logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode()); final long startEngineStart = stopWatch.totalTime().millis(); @@ -332,7 +331,7 @@ public class RecoverySourceHandler { // operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()), + new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), totalTranslogOps), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -463,14 +462,14 @@ public class RecoverySourceHandler { // make sense to re-enable throttling in this phase cancellableThreads.execute(() -> { final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( - request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); + request.recoveryId(), request.shardId(), operations, snapshot.totalOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); if (logger.isTraceEnabled()) { logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}", indexName, shardId, ops, new ByteSizeValue(size), - snapshot.estimatedTotalOperations(), + snapshot.totalOperations(), request.targetNode()); } @@ -488,7 +487,7 @@ public class RecoverySourceHandler { if (!operations.isEmpty()) { cancellableThreads.execute(() -> { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( - request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); + request.recoveryId(), request.shardId(), operations, snapshot.totalOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); @@ -497,7 +496,7 @@ public class RecoverySourceHandler { if (logger.isTraceEnabled()) { logger.trace("[{}][{}] sent final batch of [{}][{}] (total: [{}]) translog operations to {}", indexName, shardId, ops, new ByteSizeValue(size), - snapshot.estimatedTotalOperations(), + snapshot.totalOperations(), request.targetNode()); } return totalOperations; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index e849580b2c4..16bd1d46553 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -58,7 +58,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { shard.failShard("failed to close engine (phase1)", e); } } - prepareTargetForTranslog(Translog.View.EMPTY_VIEW); + prepareTargetForTranslog(0); finalizeRecovery(); return response; } catch (Throwable t) { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index da5e5c8aa08..e1935328232 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -138,8 +138,8 @@ public class TranslogTests extends ESTestCase { private TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build(); ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize); } @@ -234,19 +234,16 @@ public class TranslogTests extends ESTestCase { ArrayList ops = new ArrayList<>(); Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - snapshot.close(); addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1})); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size())); - snapshot.close(); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); addToTranslogAndList(translog, ops, new Translog.Delete(newUid("2"))); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size())); - snapshot.close(); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); snapshot = translog.newSnapshot(); @@ -260,22 +257,18 @@ public class TranslogTests extends ESTestCase { assertThat(snapshot.next(), equalTo(null)); - snapshot.close(); - long firstId = translog.currentFileGeneration(); translog.prepareCommit(); assertThat(translog.currentFileGeneration(), Matchers.not(equalTo(firstId))); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size())); - snapshot.close(); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); translog.commit(); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - assertThat(snapshot.estimatedTotalOperations(), equalTo(0)); - snapshot.close(); + assertThat(snapshot.totalOperations(), equalTo(0)); } protected TranslogStats stats() throws IOException { @@ -337,9 +330,9 @@ public class TranslogTests extends ESTestCase { assertEquals(6, copy.estimatedNumberOfOperations()); assertEquals(431, copy.getTranslogSizeInBytes()); assertEquals("\"translog\"{\n" + - " \"operations\" : 6,\n" + - " \"size_in_bytes\" : 431\n" + - "}", copy.toString().trim()); + " \"operations\" : 6,\n" + + " \"size_in_bytes\" : 431\n" + + "}", copy.toString().trim()); try { new TranslogStats(1, -1); @@ -359,51 +352,43 @@ public class TranslogTests extends ESTestCase { ArrayList ops = new ArrayList<>(); Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - snapshot.close(); addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1})); snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.estimatedTotalOperations(), equalTo(1)); - snapshot.close(); + assertThat(snapshot.totalOperations(), equalTo(1)); snapshot = translog.newSnapshot(); - assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot.estimatedTotalOperations(), equalTo(1)); - - // snapshot while another is open Translog.Snapshot snapshot1 = translog.newSnapshot(); - assertThat(snapshot1, SnapshotMatchers.size(1)); - assertThat(snapshot1.estimatedTotalOperations(), equalTo(1)); + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(1)); - snapshot.close(); - snapshot1.close(); + assertThat(snapshot1, SnapshotMatchers.size(1)); + assertThat(snapshot1.totalOperations(), equalTo(1)); } public void testSnapshotWithNewTranslog() throws IOException { ArrayList ops = new ArrayList<>(); Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); - snapshot.close(); addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1})); Translog.Snapshot snapshot1 = translog.newSnapshot(); addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2})); + assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); + translog.prepareCommit(); addToTranslogAndList(translog, ops, new Translog.Index("test", "3", new byte[]{3})); - Translog.Snapshot snapshot2 = translog.newSnapshot(); - translog.commit(); - assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); - assertThat(snapshot2.estimatedTotalOperations(), equalTo(ops.size())); - - - assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); - snapshot1.close(); - snapshot2.close(); + try (Translog.View view = translog.newView()) { + Translog.Snapshot snapshot2 = translog.newSnapshot(); + translog.commit(); + assertThat(snapshot2, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot2.totalOperations(), equalTo(ops.size())); + } } public void testSnapshotOnClosedTranslog() throws IOException { @@ -418,39 +403,6 @@ public class TranslogTests extends ESTestCase { } } - public void testDeleteOnSnapshotRelease() throws Exception { - ArrayList firstOps = new ArrayList<>(); - addToTranslogAndList(translog, firstOps, new Translog.Index("test", "1", new byte[]{1})); - - Translog.Snapshot firstSnapshot = translog.newSnapshot(); - assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1)); - translog.commit(); - assertFileIsPresent(translog, 1); - - - ArrayList secOps = new ArrayList<>(); - addToTranslogAndList(translog, secOps, new Translog.Index("test", "2", new byte[]{2})); - assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1)); - - Translog.Snapshot secondSnapshot = translog.newSnapshot(); - translog.add(new Translog.Index("test", "3", new byte[]{3})); - assertThat(secondSnapshot, SnapshotMatchers.equalsTo(secOps)); - assertThat(secondSnapshot.estimatedTotalOperations(), equalTo(1)); - assertFileIsPresent(translog, 1); - assertFileIsPresent(translog, 2); - - firstSnapshot.close(); - assertFileDeleted(translog, 1); - assertFileIsPresent(translog, 2); - secondSnapshot.close(); - assertFileIsPresent(translog, 2); // it's the current nothing should be deleted - translog.commit(); - assertFileIsPresent(translog, 3); // it's the current nothing should be deleted - assertFileDeleted(translog, 2); - - } - - public void assertFileIsPresent(Translog translog, long id) { if (Files.exists(translogDir.resolve(Translog.getFilename(id)))) { return; @@ -624,14 +576,8 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(1)); assertFileIsPresent(translog, 1); - assertThat(snapshot.estimatedTotalOperations(), equalTo(1)); - if (randomBoolean()) { - translog.close(); - snapshot.close(); - } else { - snapshot.close(); - translog.close(); - } + assertThat(snapshot.totalOperations(), equalTo(1)); + translog.close(); assertFileIsPresent(translog, 1); } @@ -708,16 +654,21 @@ public class TranslogTests extends ESTestCase { public void onFailure(Throwable t) { logger.error("--> reader [{}] had an error", t, threadId); errors.add(t); - closeView(); + try { + closeView(); + } catch (IOException e) { + logger.error("unexpected error while closing view, after failure"); + t.addSuppressed(e); + } } - void closeView() { + void closeView() throws IOException { if (view != null) { view.close(); } } - void newView() { + void newView() throws IOException { closeView(); view = translog.newView(); // captures the currently written ops so we know what to expect from the view @@ -738,17 +689,16 @@ public class TranslogTests extends ESTestCase { // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeAll(writtenOpsAtView); - try (Translog.Snapshot snapshot = view.snapshot()) { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - expectedOps.remove(op); - } + Translog.Snapshot snapshot = view.snapshot(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + expectedOps.remove(op); } if (expectedOps.isEmpty() == false) { StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()).append(" operations"); boolean failed = false; - for (Translog.Operation op : expectedOps) { - final Translog.Location loc = writtenOps.get(op); + for (Translog.Operation expectedOp : expectedOps) { + final Translog.Location loc = writtenOps.get(expectedOp); if (loc.generation < view.minTranslogGeneration()) { // writtenOps is only updated after the op was written to the translog. This mean // that ops written to the translog before the view was taken (and will be missing from the view) @@ -756,7 +706,7 @@ public class TranslogTests extends ESTestCase { continue; } failed = true; - missed.append("\n --> [").append(op).append("] written at ").append(loc); + missed.append("\n --> [").append(expectedOp).append("] written at ").append(loc); } if (failed) { fail(missed.toString()); @@ -803,7 +753,6 @@ public class TranslogTests extends ESTestCase { } } - public void testSyncUpTo() throws IOException { int translogOperations = randomIntBetween(10, 100); int count = 0; @@ -875,7 +824,7 @@ public class TranslogTests extends ESTestCase { final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)); - try (final ImmutableTranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { + try (final TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { assertEquals(lastSynced + 1, reader.totalOperations()); for (int op = 0; op < translogOperations; op++) { Translog.Location location = locations.get(op); @@ -913,7 +862,7 @@ public class TranslogTests extends ESTestCase { } writer.sync(); - final TranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); + final BaseTranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); for (int i = 0; i < numOps; i++) { ByteBuffer buffer = ByteBuffer.allocate(4); reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); @@ -926,7 +875,7 @@ public class TranslogTests extends ESTestCase { out.writeInt(2048); writer.add(new BytesArray(bytes)); - if (reader instanceof ImmutableTranslogReader) { + if (reader instanceof TranslogReader) { ByteBuffer buffer = ByteBuffer.allocate(4); try { reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * numOps); @@ -934,6 +883,7 @@ public class TranslogTests extends ESTestCase { } catch (EOFException ex) { // expected } + ((TranslogReader) reader).close(); } else { // live reader! ByteBuffer buffer = ByteBuffer.allocate(4); @@ -943,7 +893,7 @@ public class TranslogTests extends ESTestCase { final int value = buffer.getInt(); assertEquals(2048, value); } - IOUtils.close(writer, reader); + IOUtils.close(writer); } public void testBasicRecovery() throws IOException { @@ -971,19 +921,17 @@ public class TranslogTests extends ESTestCase { assertEquals(0, translog.stats().estimatedNumberOfOperations()); assertEquals(1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - assertNull(snapshot.next()); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + assertNull(snapshot.next()); } else { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - for (int i = minUncommittedOp; i < translogOperations; i++) { - assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + for (int i = minUncommittedOp; i < translogOperations; i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8())); } } } @@ -1014,13 +962,12 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); } } if (randomBoolean()) { // recover twice @@ -1028,13 +975,12 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); } } } @@ -1071,14 +1017,14 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); } + } if (randomBoolean()) { // recover twice @@ -1086,13 +1032,12 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); } } } @@ -1132,13 +1077,12 @@ public class TranslogTests extends ESTestCase { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int upTo = sync ? translogOperations : prepareOp; - for (int i = 0; i < upTo; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null synced: " + sync, next); - assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8())); } } } @@ -1209,14 +1153,13 @@ public class TranslogTests extends ESTestCase { } config.setTranslogGeneration(translogGeneration); this.translog = new Translog(config); - try (Translog.Snapshot snapshot = this.translog.newSnapshot()) { - for (int i = firstUncommitted; i < translogOperations; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("" + i, next); - assertEquals(Integer.parseInt(next.getSource().source.toUtf8()), i); - } - assertNull(snapshot.next()); + Translog.Snapshot snapshot = this.translog.newSnapshot(); + for (int i = firstUncommitted; i < translogOperations; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("" + i, next); + assertEquals(Integer.parseInt(next.getSource().source.toUtf8()), i); } + assertNull(snapshot.next()); } public void testFailOnClosedWrite() throws IOException { @@ -1287,12 +1230,12 @@ public class TranslogTests extends ESTestCase { case CREATE: case INDEX: op = new Translog.Index("test", threadId + "_" + opCount, - randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); + randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); break; case DELETE: op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount), - 1 + randomInt(100000), - randomFrom(VersionType.values())); + 1 + randomInt(100000), + randomFrom(VersionType.values())); break; default: throw new ElasticsearchException("not supported op type"); @@ -1383,14 +1326,13 @@ public class TranslogTests extends ESTestCase { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); - try (Translog.Snapshot snapshot = tlog.newSnapshot()) { - assertEquals(opsSynced, snapshot.estimatedTotalOperations()); - for (int i = 0; i < opsSynced; i++) { - assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = tlog.newSnapshot(); + assertEquals(opsSynced, snapshot.totalOperations()); + for (int i = 0; i < opsSynced; i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8())); } } } @@ -1401,13 +1343,12 @@ public class TranslogTests extends ESTestCase { LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - assertEquals(opsAdded + 1, snapshot.estimatedTotalOperations()); - for (int i = 0; i < opsAdded; i++) { - assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - } + Translog.Snapshot snapshot = this.translog.newSnapshot(); + assertEquals(opsAdded + 1, snapshot.totalOperations()); + for (int i = 0; i < opsAdded; i++) { + assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); } } } @@ -1511,20 +1452,20 @@ public class TranslogTests extends ESTestCase { } config.setTranslogGeneration(translog.getGeneration()); try (Translog tlog = new Translog(config)) { - try (Translog.Snapshot snapshot = tlog.newSnapshot()) { - if (writtenOperations.size() != snapshot.estimatedTotalOperations()) { - for (int i = 0; i < threadCount; i++) { - if (threadExceptions[i] != null) - threadExceptions[i].printStackTrace(); + Translog.Snapshot snapshot = tlog.newSnapshot(); + if (writtenOperations.size() != snapshot.totalOperations()) { + for (int i = 0; i < threadCount; i++) { + if (threadExceptions[i] != null) { + threadExceptions[i].printStackTrace(); } } - assertEquals(writtenOperations.size(), snapshot.estimatedTotalOperations()); - for (int i = 0; i < writtenOperations.size(); i++) { - assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation); - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals(next, writtenOperations.get(i).operation); - } + } + assertEquals(writtenOperations.size(), snapshot.totalOperations()); + for (int i = 0; i < writtenOperations.size(); i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(next, writtenOperations.get(i).operation); } } } @@ -1537,6 +1478,7 @@ public class TranslogTests extends ESTestCase { private static class FailSwitch { private volatile int failRate; private volatile boolean onceFailedFailAlways = false; + public boolean fail() { boolean fail = randomIntBetween(1, 100) <= failRate; if (fail && onceFailedFailAlways) { @@ -1716,24 +1658,22 @@ public class TranslogTests extends ESTestCase { try (Translog tlog = new Translog(config)) { assertNotNull(translogGeneration); assertFalse(tlog.syncNeeded()); - try (Translog.Snapshot snapshot = tlog.newSnapshot()) { - for (int i = 0; i < 1; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = tlog.newSnapshot(); + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); } tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } try (Translog tlog = new Translog(config)) { assertNotNull(translogGeneration); assertFalse(tlog.syncNeeded()); - try (Translog.Snapshot snapshot = tlog.newSnapshot()) { - for (int i = 0; i < 2; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = tlog.newSnapshot(); + for (int i = 0; i < 2; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); } } } @@ -1749,7 +1689,7 @@ public class TranslogTests extends ESTestCase { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); config.setTranslogGeneration(translogGeneration); - try { + try { Translog tlog = new Translog(config); fail("file already exists?"); } catch (TranslogException ex) { @@ -1758,6 +1698,7 @@ public class TranslogTests extends ESTestCase { assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class); } } + public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); Translog.TranslogGeneration translogGeneration = translog.getGeneration(); @@ -1774,17 +1715,16 @@ public class TranslogTests extends ESTestCase { try (Translog tlog = new Translog(config)) { assertNotNull(translogGeneration); assertFalse(tlog.syncNeeded()); - try (Translog.Snapshot snapshot = tlog.newSnapshot()) { - for (int i = 0; i < 1; i++) { - Translog.Operation next = snapshot.next(); - assertNotNull("operation " + i + " must be non-null", next); - assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); - } + Translog.Snapshot snapshot = tlog.newSnapshot(); + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); } tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } - try { + try { Translog tlog = new Translog(config); fail("file already exists?"); } catch (TranslogException ex) { @@ -1863,13 +1803,12 @@ public class TranslogTests extends ESTestCase { } try (Translog translog = new Translog(config)) { - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - assertEquals(syncedDocs.size(), snapshot.estimatedTotalOperations()); - for (int i = 0; i < syncedDocs.size(); i++) { - Translog.Operation next = snapshot.next(); - assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8()); - assertNotNull("operation " + i + " must be non-null", next); - } + Translog.Snapshot snapshot = translog.newSnapshot(); + assertEquals(syncedDocs.size(), snapshot.totalOperations()); + for (int i = 0; i < syncedDocs.size(); i++) { + Translog.Operation next = snapshot.next(); + assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8()); + assertNotNull("operation " + i + " must be non-null", next); } } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java index 68f26c504fb..8ae7117d483 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.translog; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.index.VersionType; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -29,66 +28,32 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; /** * Tests for reading old and new translog files */ public class TranslogVersionTests extends ESTestCase { - public void testV0LegacyTranslogVersion() throws Exception { - Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v0.binary"); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - try (ImmutableTranslogReader reader = openReader(translogFile, 0)) { - assertThat("a version0 stream is returned", reader instanceof LegacyTranslogReader, equalTo(true)); - try (final Translog.Snapshot snapshot = reader.newSnapshot()) { - final Translog.Operation operation = snapshot.next(); - assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true)); - Translog.Index op = (Translog.Index) operation; - assertThat(op.id(), equalTo("1")); - assertThat(op.type(), equalTo("doc")); - assertThat(op.source().toUtf8(), equalTo("{\"body\": \"worda wordb wordc wordd \\\"worde\\\" wordf\"}")); - assertThat(op.routing(), equalTo(null)); - assertThat(op.parent(), equalTo(null)); - assertThat(op.version(), equalTo(1L)); - assertThat(op.timestamp(), equalTo(1407312091791L)); - assertThat(op.ttl(), equalTo(-1L)); - assertThat(op.versionType(), equalTo(VersionType.INTERNAL)); - assertNull(snapshot.next()); - } + private void checkFailsToOpen(String file, String expectedMessage) throws IOException { + Path translogFile = getDataPath(file); + assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); + try { + openReader(translogFile, 0); + fail("should be able to open an old translog"); + } catch (IllegalStateException e) { + assertThat(e.getMessage(), containsString(expectedMessage)); } + + } + + public void testV0LegacyTranslogVersion() throws Exception { + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v0.binary", "pre-1.4 translog"); } public void testV1ChecksummedTranslogVersion() throws Exception { - Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1.binary"); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - try (ImmutableTranslogReader reader = openReader(translogFile, 0)) { - try (final Translog.Snapshot snapshot = reader.newSnapshot()) { - - assertThat("a version1 stream is returned", reader instanceof ImmutableTranslogReader, equalTo(true)); - - Translog.Operation operation = snapshot.next(); - - assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true)); - Translog.Index op = (Translog.Index) operation; - assertThat(op.id(), equalTo("Bwiq98KFSb6YjJQGeSpeiw")); - assertThat(op.type(), equalTo("doc")); - assertThat(op.source().toUtf8(), equalTo("{\"body\": \"foo\"}")); - assertThat(op.routing(), equalTo(null)); - assertThat(op.parent(), equalTo(null)); - assertThat(op.version(), equalTo(1L)); - assertThat(op.timestamp(), equalTo(1408627184844L)); - assertThat(op.ttl(), equalTo(-1L)); - assertThat(op.versionType(), equalTo(VersionType.INTERNAL)); - - // There are more operations - int opNum = 1; - while (snapshot.next() != null) { - opNum++; - } - assertThat("there should be 5 translog operations", opNum, equalTo(5)); - } - } + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1.binary", "pre-2.0 translog"); } public void testCorruptedTranslogs() throws Exception { @@ -112,47 +77,17 @@ public class TranslogVersionTests extends ESTestCase { e.getMessage().contains("Invalid first byte in translog file, got: 1, expected 0x00 or 0x3f"), equalTo(true)); } - try { - Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary"); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - try (ImmutableTranslogReader reader = openReader(translogFile, 0)) { - try (final Translog.Snapshot snapshot = reader.newSnapshot()) { - while(snapshot.next() != null) { - - } - } - } - fail("should have thrown an exception about the body being corrupted"); - } catch (TranslogCorruptedException e) { - assertThat("translog corruption from body: " + e.getMessage(), - e.getMessage().contains("translog corruption while reading from stream"), equalTo(true)); - } - + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary", "pre-2.0 translog"); } public void testTruncatedTranslog() throws Exception { - try { - Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1-truncated.binary"); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - try (ImmutableTranslogReader reader = openReader(translogFile, 0)) { - try (final Translog.Snapshot snapshot = reader.newSnapshot()) { - while(snapshot.next() != null) { - - } - } - } - fail("should have thrown an exception about the body being truncated"); - } catch (TranslogCorruptedException e) { - assertThat("translog truncated: " + e.getMessage(), - e.getMessage().contains("operation size is corrupted must be"), equalTo(true)); - } + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-truncated.binary", "pre-2.0 translog"); } - public ImmutableTranslogReader openReader(Path path, long id) throws IOException { + public TranslogReader openReader(Path path, long id) throws IOException { FileChannel channel = FileChannel.open(path, StandardOpenOption.READ); try { - final ChannelReference raf = new ChannelReference(path, id, channel, null); - ImmutableTranslogReader reader = ImmutableTranslogReader.open(raf, new Checkpoint(Files.size(path), TranslogReader.UNKNOWN_OP_COUNT, id), null); + TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id), null); channel = null; return reader; } finally {