From 60b66a7235dc9eb30e5705a3d77096b28925664f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 18 May 2015 11:14:00 +0200 Subject: [PATCH] [TRANSLOG] Fold Translog.View into it's only implementation --- .../index/translog/Translog.java | 73 +++++++++---------- .../recovery/RecoverySourceHandler.java | 4 +- .../SharedFSRecoverySourceHandler.java | 32 +------- 3 files changed, 39 insertions(+), 70 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 8bc71be6576..3323a71006a 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -110,7 +110,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private volatile ScheduledFuture syncScheduler; // this is a concurrent set and is not protected by any of the locks. The main reason // is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed) - private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); + private final Set outstandingViews = ConcurrentCollections.newConcurrentSet(); private BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; @@ -121,6 +121,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final String translogUUID; + private Callback onViewClose = new Callback() { + @Override + public void handle(View view) { + logger.trace("closing view starting at translog [{}]", view.minTranslogGeneration()); + outstandingViews.remove(this); + } + }; + /** @@ -475,7 +483,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - private Snapshot createSnapshot(TranslogReader... translogs) { + private static Snapshot createSnapshot(TranslogReader... translogs) { Snapshot[] snapshots = new Snapshot[translogs.length]; boolean success = false; try { @@ -507,7 +515,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC translogs.add(currentCommittingTranslog.clone()); } translogs.add(current.newReaderFromWriter()); - FsView view = new FsView(translogs); + View view = new View(translogs, onViewClose); // this is safe as we know that no new translog is being made at the moment // (we hold a read lock) and the view will be notified of any future one outstandingViews.add(view); @@ -615,16 +623,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * a view into the translog, capturing all translog file at the moment of creation * and updated with any future translog. */ - class FsView implements View { + public static final class View implements Closeable { + public static final Translog.View EMPTY_VIEW = new View(Collections.EMPTY_LIST, null); boolean closed; // last in this list is always FsTranslog.current final List orderedTranslogs; + private final Callback onClose; - FsView(List orderedTranslogs) { - assert orderedTranslogs.isEmpty() == false; + View(List orderedTranslogs, Callback onClose) { // clone so we can safely mutate.. this.orderedTranslogs = new ArrayList<>(orderedTranslogs); + this.onClose = onClose; } /** @@ -647,13 +657,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC orderedTranslogs.add(newCurrent); } - @Override + /** this smallest translog generation in this view */ public synchronized long minTranslogGeneration() { ensureOpen(); return orderedTranslogs.get(0).getGeneration(); } - @Override + /** + * The total number of operations in the view. + */ public synchronized int totalOperations() { int ops = 0; for (TranslogReader translog : orderedTranslogs) { @@ -667,7 +679,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return ops; } - @Override + /** + * Returns the size in bytes of the files behind the view. + */ public synchronized long sizeInBytes() { long size = 0; for (TranslogReader translog : orderedTranslogs) { @@ -676,6 +690,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return size; } + /** create a snapshot from this view */ public synchronized Snapshot snapshot() { ensureOpen(); return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); @@ -690,15 +705,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC @Override public void close() { - List toClose = new ArrayList<>(); + final List toClose = new ArrayList<>(); try { synchronized (this) { if (closed == false) { - logger.trace("closing view starting at translog [{}]", minTranslogGeneration()); - closed = true; - outstandingViews.remove(this); - toClose.addAll(orderedTranslogs); - orderedTranslogs.clear(); + try { + if (onClose != null) { + onClose.handle(this); + } + } finally { + closed = true; + toClose.addAll(orderedTranslogs); + orderedTranslogs.clear(); + } } } } finally { @@ -816,27 +835,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } - /** a view into the current translog that receives all operations from the moment created */ - public interface View extends Releasable { - - /** - * The total number of operations in the view. - */ - int totalOperations(); - - /** - * Returns the size in bytes of the files behind the view. - */ - long sizeInBytes(); - - /** create a snapshot from this view */ - Snapshot snapshot(); - - /** this smallest translog generation in this view */ - long minTranslogGeneration(); - - } - /** * A generic interface representing an operation performed on the transaction log. * Each is associated with a type. @@ -1666,7 +1664,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC current = createWriter(current.getGeneration() + 1); // notify all outstanding views of the new translog (no views are created now as // we hold a write lock). - for (FsView view : outstandingViews) { + for (View view : outstandingViews) { view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter()); } IOUtils.close(oldCurrent); @@ -1759,5 +1757,4 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 823742404e3..86cc680b56a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -29,6 +29,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Nullable; @@ -56,6 +57,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -123,7 +125,7 @@ public class RecoverySourceHandler { try { phase1Snapshot = shard.snapshotIndex(false); } catch (Throwable e) { - Releasables.closeWhileHandlingException(translogView); + IOUtils.closeWhileHandlingException(translogView); throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 9e80accf5e9..2b691f558e1 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -34,7 +34,6 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { private final IndexShard shard; private final StartRecoveryRequest request; - private static final Translog.View EMPTY_VIEW = new EmptyView(); public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) { super(shard, request, recoverySettings, transportService, logger); @@ -59,7 +58,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { shard.failShard("failed to close engine (phase1)", e); } } - prepareTargetForTranslog(EMPTY_VIEW); + prepareTargetForTranslog(Translog.View.EMPTY_VIEW); finalizeRecovery(); return response; } catch (Throwable t) { @@ -88,33 +87,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary(); } - /** - * An empty view since we don't recover from translog even in the shared FS case - */ - private static class EmptyView implements Translog.View { - - @Override - public int totalOperations() { - return 0; - } - - @Override - public long sizeInBytes() { - return 0; - } - - @Override - public Translog.Snapshot snapshot() { - return null; - } - - @Override - public long minTranslogGeneration() { - return 0; - } - - @Override - public void close() { - } - } }