diff --git a/src/main/java/org/elasticsearch/common/io/stream/NoopStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/NoopStreamOutput.java deleted file mode 100644 index c77cce8d9b7..00000000000 --- a/src/main/java/org/elasticsearch/common/io/stream/NoopStreamOutput.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.io.stream; - -import java.io.IOException; - -/** - * A non-threadsafe StreamOutput that doesn't actually write the bytes to any - * stream, it only keeps track of how many bytes have been written - */ -public final class NoopStreamOutput extends StreamOutput { - - private int count = 0; - - /** Retrieve the number of bytes that have been written */ - public int getCount() { - return count; - } - - @Override - public void writeByte(byte b) throws IOException { - count++; - } - - @Override - public void writeBytes(byte[] b, int offset, int length) throws IOException { - count += length; - } - - @Override - public void flush() throws IOException { - // no-op - } - - @Override - public void close() throws IOException { - // nothing to close - } - - @Override - public void reset() throws IOException { - count = 0; - } -} diff --git a/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java index 3c3fbf1ecbb..b99a6da5f22 100644 --- a/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java +++ b/src/main/java/org/elasticsearch/index/translog/BufferedChecksumStreamOutput.java @@ -70,4 +70,8 @@ public final class BufferedChecksumStreamOutput extends StreamOutput { out.reset(); digest.reset(); } + + public void resetDigest() { + digest.reset(); + } } diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 8bc71be6576..af04ec50dd3 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -110,7 +110,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private volatile ScheduledFuture syncScheduler; // this is a concurrent set and is not protected by any of the locks. The main reason // is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed) - private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); + private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); private BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; @@ -121,6 +121,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final String translogUUID; + private Callback onViewClose = new Callback() { + @Override + public void handle(View view) { + logger.trace("closing view starting at translog [{}]", view.minTranslogGeneration()); + boolean removed = outstandingViews.remove(view); + assert removed : "View was never set but was supposed to be removed"; + } + }; + /** @@ -440,10 +449,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * @see org.elasticsearch.index.translog.Translog.Delete */ public Location add(Operation operation) throws TranslogException { - ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); + final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { - writeOperation(out, operation); - ReleasablePagedBytesReference bytes = out.bytes(); + final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); + final long start = out.position(); + out.skip(RamUsageEstimator.NUM_BYTES_INT); + writeOperationNoSize(checksumStreamOutput, operation); + final long end = out.position(); + final int operationSize = (int) (end - RamUsageEstimator.NUM_BYTES_INT - start); + out.seek(start); + out.writeInt(operationSize); + out.seek(end); + final ReleasablePagedBytesReference bytes = out.bytes(); try (ReleasableLock lock = readLock.acquire()) { Location location = current.add(bytes); if (config.isSyncOnEachOperation()) { @@ -475,7 +492,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - private Snapshot createSnapshot(TranslogReader... translogs) { + private static Snapshot createSnapshot(TranslogReader... translogs) { Snapshot[] snapshots = new Snapshot[translogs.length]; boolean success = false; try { @@ -507,7 +524,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC translogs.add(currentCommittingTranslog.clone()); } translogs.add(current.newReaderFromWriter()); - FsView view = new FsView(translogs); + View view = new View(translogs, onViewClose); // this is safe as we know that no new translog is being made at the moment // (we hold a read lock) and the view will be notified of any future one outstandingViews.add(view); @@ -615,16 +632,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * a view into the translog, capturing all translog file at the moment of creation * and updated with any future translog. */ - class FsView implements View { + public static final class View implements Closeable { + public static final Translog.View EMPTY_VIEW = new View(Collections.EMPTY_LIST, null); boolean closed; // last in this list is always FsTranslog.current final List orderedTranslogs; + private final Callback onClose; - FsView(List orderedTranslogs) { - assert orderedTranslogs.isEmpty() == false; + View(List orderedTranslogs, Callback onClose) { // clone so we can safely mutate.. this.orderedTranslogs = new ArrayList<>(orderedTranslogs); + this.onClose = onClose; } /** @@ -647,13 +666,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC orderedTranslogs.add(newCurrent); } - @Override + /** this smallest translog generation in this view */ public synchronized long minTranslogGeneration() { ensureOpen(); return orderedTranslogs.get(0).getGeneration(); } - @Override + /** + * The total number of operations in the view. + */ public synchronized int totalOperations() { int ops = 0; for (TranslogReader translog : orderedTranslogs) { @@ -667,7 +688,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return ops; } - @Override + /** + * Returns the size in bytes of the files behind the view. + */ public synchronized long sizeInBytes() { long size = 0; for (TranslogReader translog : orderedTranslogs) { @@ -676,6 +699,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return size; } + /** create a snapshot from this view */ public synchronized Snapshot snapshot() { ensureOpen(); return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); @@ -690,15 +714,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public void close() { - List toClose = new ArrayList<>(); + final List toClose = new ArrayList<>(); try { synchronized (this) { if (closed == false) { - logger.trace("closing view starting at translog [{}]", minTranslogGeneration()); - closed = true; - outstandingViews.remove(this); - toClose.addAll(orderedTranslogs); - orderedTranslogs.clear(); + try { + if (onClose != null) { + onClose.handle(this); + } + } finally { + closed = true; + toClose.addAll(orderedTranslogs); + orderedTranslogs.clear(); + } } } } finally { @@ -816,27 +844,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } - /** a view into the current translog that receives all operations from the moment created */ - public interface View extends Releasable { - - /** - * The total number of operations in the view. - */ - int totalOperations(); - - /** - * Returns the size in bytes of the files behind the view. - */ - long sizeInBytes(); - - /** create a snapshot from this view */ - Snapshot snapshot(); - - /** this smallest translog generation in this view */ - long minTranslogGeneration(); - - } - /** * A generic interface representing an operation performed on the transaction log. * Each is associated with a type. @@ -1548,29 +1555,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - public static Snapshot snapshotFromStream(StreamInput input, final int numOps) { + /** + * Reads a list of operations written with {@link #writeOperations(StreamOutput, List)} + */ + public static List readOperations(StreamInput input) throws IOException { + ArrayList operations = new ArrayList<>(); + int numOps = input.readInt(); final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input); - return new Snapshot() { - int read = 0; - @Override - public int estimatedTotalOperations() { - return numOps; - } - - @Override - public Operation next() throws IOException { - if (read < numOps) { - read++; - return readOperation(checksumStreamInput); - } - return null; - } - - @Override - public void close() { - // doNothing - } - }; + for (int i = 0; i < numOps; i++) { + operations.add(readOperation(checksumStreamInput)); + } + return operations; } static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException { @@ -1603,24 +1598,39 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return operation; } - public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException { - //TODO lets get rid of this crazy double writing here. + /** + * Writes all operations in the given iterable to the given output stream including the size of the array + * use {@link #readOperations(StreamInput)} to read it back. + */ + public static void writeOperations(StreamOutput outStream, List toWrite) throws IOException { + final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(BigArrays.NON_RECYCLING_INSTANCE); + try { + outStream.writeInt(toWrite.size()); + final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); + for (Operation op : toWrite) { + out.reset(); + final long start = out.position(); + out.skip(RamUsageEstimator.NUM_BYTES_INT); + writeOperationNoSize(checksumStreamOutput, op); + long end = out.position(); + int operationSize = (int) (out.position() - RamUsageEstimator.NUM_BYTES_INT - start); + out.seek(start); + out.writeInt(operationSize); + out.seek(end); + ReleasablePagedBytesReference bytes = out.bytes(); + bytes.writeTo(outStream); + } + } finally { + Releasables.close(out.bytes()); + } - // We first write to a NoopStreamOutput to get the size of the - // operation. We could write to a byte array and then send that as an - // alternative, but here we choose to use CPU over allocating new - // byte arrays. - NoopStreamOutput noopOut = new NoopStreamOutput(); - noopOut.writeByte(op.opType().id()); - op.writeTo(noopOut); - noopOut.writeInt(0); // checksum holder - int size = noopOut.getCount(); + } + public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Translog.Operation op) throws IOException { // This BufferedChecksumStreamOutput remains unclosed on purpose, // because closing it closes the underlying stream, which we don't // want to do here. - BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(outStream); - outStream.writeInt(size); // opSize is not checksummed + out.resetDigest(); out.writeByte(op.opType().id()); op.writeTo(out); long checksum = out.getChecksum(); @@ -1666,7 +1676,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC current = createWriter(current.getGeneration() + 1); // notify all outstanding views of the new translog (no views are created now as // we hold a write lock). - for (FsView view : outstandingViews) { + for (View view : outstandingViews) { view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter()); } IOUtils.close(oldCurrent); @@ -1759,5 +1769,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } + /** + * The number of currently open views + */ + int getNumOpenViews() { + return outstandingViews.size(); + } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 823742404e3..86cc680b56a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,6 +29,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; @@ -56,6 +57,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -123,7 +125,7 @@ public class RecoverySourceHandler { try { phase1Snapshot = shard.snapshotIndex(false); } catch (Throwable e) { - Releasables.closeWhileHandlingException(translogView); + IOUtils.closeWhileHandlingException(translogView); throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 8e377c729c7..b320c98568a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -70,13 +70,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); - int size = in.readVInt(); - operations = Lists.newArrayListWithExpectedSize(size); - Translog.Snapshot snapshot = Translog.snapshotFromStream(in, size); - Translog.Operation next = null; - while((next = snapshot.next()) != null) { - operations.add(next); - } + operations = Translog.readOperations(in); totalTranslogOps = in.readVInt(); } @@ -85,10 +79,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest { super.writeTo(out); out.writeLong(recoveryId); shardId.writeTo(out); - out.writeVInt(operations.size()); - for (Translog.Operation operation : operations) { - Translog.writeOperation(out, operation); - } + Translog.writeOperations(out, operations); out.writeVInt(totalTranslogOps); } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 9e80accf5e9..2b691f558e1 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -34,7 +34,6 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { private final IndexShard shard; private final StartRecoveryRequest request; - private static final Translog.View EMPTY_VIEW = new EmptyView(); public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) { super(shard, request, recoverySettings, transportService, logger); @@ -59,7 +58,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { shard.failShard("failed to close engine (phase1)", e); } } - prepareTargetForTranslog(EMPTY_VIEW); + prepareTargetForTranslog(Translog.View.EMPTY_VIEW); finalizeRecovery(); return response; } catch (Throwable t) { @@ -88,33 +87,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary(); } - /** - * An empty view since we don't recover from translog even in the shared FS case - */ - private static class EmptyView implements Translog.View { - - @Override - public int totalOperations() { - return 0; - } - - @Override - public long sizeInBytes() { - return 0; - } - - @Override - public Translog.Snapshot snapshot() { - return null; - } - - @Override - public long minTranslogGeneration() { - return 0; - } - - @Override - public void close() { - } - } } diff --git a/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 18c53ecf16b..7b34427dfc4 100644 --- a/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -70,6 +71,7 @@ import static org.hamcrest.Matchers.*; /** * */ +@LuceneTestCase.SuppressFileSystems("ExtrasFS") public class TranslogTests extends ElasticsearchTestCase { protected final ShardId shardId = new ShardId(new Index("index"), 1); @@ -106,6 +108,7 @@ public class TranslogTests extends ElasticsearchTestCase { @After public void tearDown() throws Exception { try { + assertEquals("there are still open views", 0, translog.getNumOpenViews()); translog.close(); } finally { super.tearDown(); @@ -1021,23 +1024,18 @@ public class TranslogTests extends ElasticsearchTestCase { } - public void testSnapshotFromStreamInput() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); List ops = newArrayList(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))); - Translog.writeOperation(out, test); ops.add(test); } - Translog.Snapshot snapshot = Translog.snapshotFromStream(StreamInput.wrap(out.bytes()), ops.size()); - assertEquals(ops.size(), snapshot.estimatedTotalOperations()); - for (Translog.Operation op : ops) { - assertEquals(op, snapshot.next()); - } - assertNull(snapshot.next()); - // no need to close + Translog.writeOperations(out, ops); + final List readOperations = Translog.readOperations(StreamInput.wrap(out.bytes())); + assertEquals(ops.size(), readOperations.size()); + assertEquals(ops, readOperations); } public void testLocationHashCodeEquals() throws IOException {