[TRANSLOG] Fold Translog.View into it's only implementation

This commit is contained in:
Simon Willnauer 2015-05-18 11:14:00 +02:00
parent bba1528fa4
commit 60b66a7235
3 changed files with 39 additions and 70 deletions

View File

@ -110,7 +110,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private volatile ScheduledFuture<?> syncScheduler; private volatile ScheduledFuture<?> syncScheduler;
// this is a concurrent set and is not protected by any of the locks. The main reason // this is a concurrent set and is not protected by any of the locks. The main reason
// is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed) // is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed)
private final Set<FsView> outstandingViews = ConcurrentCollections.newConcurrentSet(); private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
private BigArrays bigArrays; private BigArrays bigArrays;
protected final ReleasableLock readLock; protected final ReleasableLock readLock;
protected final ReleasableLock writeLock; protected final ReleasableLock writeLock;
@ -121,6 +121,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config; private final TranslogConfig config;
private final String translogUUID; private final String translogUUID;
private Callback<View> onViewClose = new Callback<View>() {
@Override
public void handle(View view) {
logger.trace("closing view starting at translog [{}]", view.minTranslogGeneration());
outstandingViews.remove(this);
}
};
/** /**
@ -475,7 +483,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
private Snapshot createSnapshot(TranslogReader... translogs) { private static Snapshot createSnapshot(TranslogReader... translogs) {
Snapshot[] snapshots = new Snapshot[translogs.length]; Snapshot[] snapshots = new Snapshot[translogs.length];
boolean success = false; boolean success = false;
try { try {
@ -507,7 +515,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
translogs.add(currentCommittingTranslog.clone()); translogs.add(currentCommittingTranslog.clone());
} }
translogs.add(current.newReaderFromWriter()); translogs.add(current.newReaderFromWriter());
FsView view = new FsView(translogs); View view = new View(translogs, onViewClose);
// this is safe as we know that no new translog is being made at the moment // this is safe as we know that no new translog is being made at the moment
// (we hold a read lock) and the view will be notified of any future one // (we hold a read lock) and the view will be notified of any future one
outstandingViews.add(view); outstandingViews.add(view);
@ -615,16 +623,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* a view into the translog, capturing all translog file at the moment of creation * a view into the translog, capturing all translog file at the moment of creation
* and updated with any future translog. * and updated with any future translog.
*/ */
class FsView implements View { public static final class View implements Closeable {
public static final Translog.View EMPTY_VIEW = new View(Collections.EMPTY_LIST, null);
boolean closed; boolean closed;
// last in this list is always FsTranslog.current // last in this list is always FsTranslog.current
final List<TranslogReader> orderedTranslogs; final List<TranslogReader> orderedTranslogs;
private final Callback<View> onClose;
FsView(List<TranslogReader> orderedTranslogs) { View(List<TranslogReader> orderedTranslogs, Callback<View> onClose) {
assert orderedTranslogs.isEmpty() == false;
// clone so we can safely mutate.. // clone so we can safely mutate..
this.orderedTranslogs = new ArrayList<>(orderedTranslogs); this.orderedTranslogs = new ArrayList<>(orderedTranslogs);
this.onClose = onClose;
} }
/** /**
@ -647,13 +657,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
orderedTranslogs.add(newCurrent); orderedTranslogs.add(newCurrent);
} }
@Override /** this smallest translog generation in this view */
public synchronized long minTranslogGeneration() { public synchronized long minTranslogGeneration() {
ensureOpen(); ensureOpen();
return orderedTranslogs.get(0).getGeneration(); return orderedTranslogs.get(0).getGeneration();
} }
@Override /**
* The total number of operations in the view.
*/
public synchronized int totalOperations() { public synchronized int totalOperations() {
int ops = 0; int ops = 0;
for (TranslogReader translog : orderedTranslogs) { for (TranslogReader translog : orderedTranslogs) {
@ -667,7 +679,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return ops; return ops;
} }
@Override /**
* Returns the size in bytes of the files behind the view.
*/
public synchronized long sizeInBytes() { public synchronized long sizeInBytes() {
long size = 0; long size = 0;
for (TranslogReader translog : orderedTranslogs) { for (TranslogReader translog : orderedTranslogs) {
@ -676,6 +690,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return size; return size;
} }
/** create a snapshot from this view */
public synchronized Snapshot snapshot() { public synchronized Snapshot snapshot() {
ensureOpen(); ensureOpen();
return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()])); return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()]));
@ -690,15 +705,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override @Override
public void close() { public void close() {
List<TranslogReader> toClose = new ArrayList<>(); final List<TranslogReader> toClose = new ArrayList<>();
try { try {
synchronized (this) { synchronized (this) {
if (closed == false) { if (closed == false) {
logger.trace("closing view starting at translog [{}]", minTranslogGeneration()); try {
closed = true; if (onClose != null) {
outstandingViews.remove(this); onClose.handle(this);
toClose.addAll(orderedTranslogs); }
orderedTranslogs.clear(); } finally {
closed = true;
toClose.addAll(orderedTranslogs);
orderedTranslogs.clear();
}
} }
} }
} finally { } finally {
@ -816,27 +835,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
/** a view into the current translog that receives all operations from the moment created */
public interface View extends Releasable {
/**
* The total number of operations in the view.
*/
int totalOperations();
/**
* Returns the size in bytes of the files behind the view.
*/
long sizeInBytes();
/** create a snapshot from this view */
Snapshot snapshot();
/** this smallest translog generation in this view */
long minTranslogGeneration();
}
/** /**
* A generic interface representing an operation performed on the transaction log. * A generic interface representing an operation performed on the transaction log.
* Each is associated with a type. * Each is associated with a type.
@ -1666,7 +1664,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
current = createWriter(current.getGeneration() + 1); current = createWriter(current.getGeneration() + 1);
// notify all outstanding views of the new translog (no views are created now as // notify all outstanding views of the new translog (no views are created now as
// we hold a write lock). // we hold a write lock).
for (FsView view : outstandingViews) { for (View view : outstandingViews) {
view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter()); view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter());
} }
IOUtils.close(oldCurrent); IOUtils.close(oldCurrent);
@ -1759,5 +1757,4 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
@ -56,6 +57,7 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
@ -123,7 +125,7 @@ public class RecoverySourceHandler {
try { try {
phase1Snapshot = shard.snapshotIndex(false); phase1Snapshot = shard.snapshotIndex(false);
} catch (Throwable e) { } catch (Throwable e) {
Releasables.closeWhileHandlingException(translogView); IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e); throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
} }

View File

@ -34,7 +34,6 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard; private final IndexShard shard;
private final StartRecoveryRequest request; private final StartRecoveryRequest request;
private static final Translog.View EMPTY_VIEW = new EmptyView();
public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) { public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) {
super(shard, request, recoverySettings, transportService, logger); super(shard, request, recoverySettings, transportService, logger);
@ -59,7 +58,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
shard.failShard("failed to close engine (phase1)", e); shard.failShard("failed to close engine (phase1)", e);
} }
} }
prepareTargetForTranslog(EMPTY_VIEW); prepareTargetForTranslog(Translog.View.EMPTY_VIEW);
finalizeRecovery(); finalizeRecovery();
return response; return response;
} catch (Throwable t) { } catch (Throwable t) {
@ -88,33 +87,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary(); return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary();
} }
/**
* An empty view since we don't recover from translog even in the shared FS case
*/
private static class EmptyView implements Translog.View {
@Override
public int totalOperations() {
return 0;
}
@Override
public long sizeInBytes() {
return 0;
}
@Override
public Translog.Snapshot snapshot() {
return null;
}
@Override
public long minTranslogGeneration() {
return 0;
}
@Override
public void close() {
}
}
} }