Merge pull request #11200 from s1monw/cleanup_translog
Some smallish translog cleanups
This commit is contained in:
commit
91d0580b0d
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.io.stream;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A non-threadsafe StreamOutput that doesn't actually write the bytes to any
|
||||
* stream, it only keeps track of how many bytes have been written
|
||||
*/
|
||||
public final class NoopStreamOutput extends StreamOutput {
|
||||
|
||||
private int count = 0;
|
||||
|
||||
/** Retrieve the number of bytes that have been written */
|
||||
public int getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
count++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] b, int offset, int length) throws IOException {
|
||||
count += length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// nothing to close
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
count = 0;
|
||||
}
|
||||
}
|
|
@ -70,4 +70,8 @@ public final class BufferedChecksumStreamOutput extends StreamOutput {
|
|||
out.reset();
|
||||
digest.reset();
|
||||
}
|
||||
|
||||
public void resetDigest() {
|
||||
digest.reset();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -110,7 +110,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
private volatile ScheduledFuture<?> syncScheduler;
|
||||
// this is a concurrent set and is not protected by any of the locks. The main reason
|
||||
// is that is being accessed by two separate classes (additions & reading are done by FsTranslog, remove by FsView when closed)
|
||||
private final Set<FsView> outstandingViews = ConcurrentCollections.newConcurrentSet();
|
||||
private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
|
||||
private BigArrays bigArrays;
|
||||
protected final ReleasableLock readLock;
|
||||
protected final ReleasableLock writeLock;
|
||||
|
@ -121,6 +121,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final TranslogConfig config;
|
||||
private final String translogUUID;
|
||||
private Callback<View> onViewClose = new Callback<View>() {
|
||||
@Override
|
||||
public void handle(View view) {
|
||||
logger.trace("closing view starting at translog [{}]", view.minTranslogGeneration());
|
||||
boolean removed = outstandingViews.remove(view);
|
||||
assert removed : "View was never set but was supposed to be removed";
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
@ -440,10 +449,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
* @see org.elasticsearch.index.translog.Translog.Delete
|
||||
*/
|
||||
public Location add(Operation operation) throws TranslogException {
|
||||
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
|
||||
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
|
||||
try {
|
||||
writeOperation(out, operation);
|
||||
ReleasablePagedBytesReference bytes = out.bytes();
|
||||
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
|
||||
final long start = out.position();
|
||||
out.skip(RamUsageEstimator.NUM_BYTES_INT);
|
||||
writeOperationNoSize(checksumStreamOutput, operation);
|
||||
final long end = out.position();
|
||||
final int operationSize = (int) (end - RamUsageEstimator.NUM_BYTES_INT - start);
|
||||
out.seek(start);
|
||||
out.writeInt(operationSize);
|
||||
out.seek(end);
|
||||
final ReleasablePagedBytesReference bytes = out.bytes();
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
Location location = current.add(bytes);
|
||||
if (config.isSyncOnEachOperation()) {
|
||||
|
@ -475,7 +492,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
private Snapshot createSnapshot(TranslogReader... translogs) {
|
||||
private static Snapshot createSnapshot(TranslogReader... translogs) {
|
||||
Snapshot[] snapshots = new Snapshot[translogs.length];
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -507,7 +524,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
translogs.add(currentCommittingTranslog.clone());
|
||||
}
|
||||
translogs.add(current.newReaderFromWriter());
|
||||
FsView view = new FsView(translogs);
|
||||
View view = new View(translogs, onViewClose);
|
||||
// this is safe as we know that no new translog is being made at the moment
|
||||
// (we hold a read lock) and the view will be notified of any future one
|
||||
outstandingViews.add(view);
|
||||
|
@ -615,16 +632,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
* a view into the translog, capturing all translog file at the moment of creation
|
||||
* and updated with any future translog.
|
||||
*/
|
||||
class FsView implements View {
|
||||
public static final class View implements Closeable {
|
||||
public static final Translog.View EMPTY_VIEW = new View(Collections.EMPTY_LIST, null);
|
||||
|
||||
boolean closed;
|
||||
// last in this list is always FsTranslog.current
|
||||
final List<TranslogReader> orderedTranslogs;
|
||||
private final Callback<View> onClose;
|
||||
|
||||
FsView(List<TranslogReader> orderedTranslogs) {
|
||||
assert orderedTranslogs.isEmpty() == false;
|
||||
View(List<TranslogReader> orderedTranslogs, Callback<View> onClose) {
|
||||
// clone so we can safely mutate..
|
||||
this.orderedTranslogs = new ArrayList<>(orderedTranslogs);
|
||||
this.onClose = onClose;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -647,13 +666,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
orderedTranslogs.add(newCurrent);
|
||||
}
|
||||
|
||||
@Override
|
||||
/** this smallest translog generation in this view */
|
||||
public synchronized long minTranslogGeneration() {
|
||||
ensureOpen();
|
||||
return orderedTranslogs.get(0).getGeneration();
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* The total number of operations in the view.
|
||||
*/
|
||||
public synchronized int totalOperations() {
|
||||
int ops = 0;
|
||||
for (TranslogReader translog : orderedTranslogs) {
|
||||
|
@ -667,7 +688,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return ops;
|
||||
}
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Returns the size in bytes of the files behind the view.
|
||||
*/
|
||||
public synchronized long sizeInBytes() {
|
||||
long size = 0;
|
||||
for (TranslogReader translog : orderedTranslogs) {
|
||||
|
@ -676,6 +699,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return size;
|
||||
}
|
||||
|
||||
/** create a snapshot from this view */
|
||||
public synchronized Snapshot snapshot() {
|
||||
ensureOpen();
|
||||
return createSnapshot(orderedTranslogs.toArray(new TranslogReader[orderedTranslogs.size()]));
|
||||
|
@ -690,17 +714,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
List<TranslogReader> toClose = new ArrayList<>();
|
||||
final List<TranslogReader> toClose = new ArrayList<>();
|
||||
try {
|
||||
synchronized (this) {
|
||||
if (closed == false) {
|
||||
logger.trace("closing view starting at translog [{}]", minTranslogGeneration());
|
||||
try {
|
||||
if (onClose != null) {
|
||||
onClose.handle(this);
|
||||
}
|
||||
} finally {
|
||||
closed = true;
|
||||
outstandingViews.remove(this);
|
||||
toClose.addAll(orderedTranslogs);
|
||||
orderedTranslogs.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
// Close out of lock to prevent deadlocks between channel close which checks for
|
||||
|
@ -816,27 +844,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
}
|
||||
|
||||
/** a view into the current translog that receives all operations from the moment created */
|
||||
public interface View extends Releasable {
|
||||
|
||||
/**
|
||||
* The total number of operations in the view.
|
||||
*/
|
||||
int totalOperations();
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the files behind the view.
|
||||
*/
|
||||
long sizeInBytes();
|
||||
|
||||
/** create a snapshot from this view */
|
||||
Snapshot snapshot();
|
||||
|
||||
/** this smallest translog generation in this view */
|
||||
long minTranslogGeneration();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A generic interface representing an operation performed on the transaction log.
|
||||
* Each is associated with a type.
|
||||
|
@ -1548,29 +1555,17 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
public static Snapshot snapshotFromStream(StreamInput input, final int numOps) {
|
||||
/**
|
||||
* Reads a list of operations written with {@link #writeOperations(StreamOutput, List)}
|
||||
*/
|
||||
public static List<Operation> readOperations(StreamInput input) throws IOException {
|
||||
ArrayList<Operation> operations = new ArrayList<>();
|
||||
int numOps = input.readInt();
|
||||
final BufferedChecksumStreamInput checksumStreamInput = new BufferedChecksumStreamInput(input);
|
||||
return new Snapshot() {
|
||||
int read = 0;
|
||||
@Override
|
||||
public int estimatedTotalOperations() {
|
||||
return numOps;
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
operations.add(readOperation(checksumStreamInput));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Operation next() throws IOException {
|
||||
if (read < numOps) {
|
||||
read++;
|
||||
return readOperation(checksumStreamInput);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// doNothing
|
||||
}
|
||||
};
|
||||
return operations;
|
||||
}
|
||||
|
||||
static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws IOException {
|
||||
|
@ -1603,24 +1598,39 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return operation;
|
||||
}
|
||||
|
||||
public static void writeOperation(StreamOutput outStream, Translog.Operation op) throws IOException {
|
||||
//TODO lets get rid of this crazy double writing here.
|
||||
/**
|
||||
* Writes all operations in the given iterable to the given output stream including the size of the array
|
||||
* use {@link #readOperations(StreamInput)} to read it back.
|
||||
*/
|
||||
public static void writeOperations(StreamOutput outStream, List<Operation> toWrite) throws IOException {
|
||||
final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(BigArrays.NON_RECYCLING_INSTANCE);
|
||||
try {
|
||||
outStream.writeInt(toWrite.size());
|
||||
final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
|
||||
for (Operation op : toWrite) {
|
||||
out.reset();
|
||||
final long start = out.position();
|
||||
out.skip(RamUsageEstimator.NUM_BYTES_INT);
|
||||
writeOperationNoSize(checksumStreamOutput, op);
|
||||
long end = out.position();
|
||||
int operationSize = (int) (out.position() - RamUsageEstimator.NUM_BYTES_INT - start);
|
||||
out.seek(start);
|
||||
out.writeInt(operationSize);
|
||||
out.seek(end);
|
||||
ReleasablePagedBytesReference bytes = out.bytes();
|
||||
bytes.writeTo(outStream);
|
||||
}
|
||||
} finally {
|
||||
Releasables.close(out.bytes());
|
||||
}
|
||||
|
||||
// We first write to a NoopStreamOutput to get the size of the
|
||||
// operation. We could write to a byte array and then send that as an
|
||||
// alternative, but here we choose to use CPU over allocating new
|
||||
// byte arrays.
|
||||
NoopStreamOutput noopOut = new NoopStreamOutput();
|
||||
noopOut.writeByte(op.opType().id());
|
||||
op.writeTo(noopOut);
|
||||
noopOut.writeInt(0); // checksum holder
|
||||
int size = noopOut.getCount();
|
||||
}
|
||||
|
||||
public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Translog.Operation op) throws IOException {
|
||||
// This BufferedChecksumStreamOutput remains unclosed on purpose,
|
||||
// because closing it closes the underlying stream, which we don't
|
||||
// want to do here.
|
||||
BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(outStream);
|
||||
outStream.writeInt(size); // opSize is not checksummed
|
||||
out.resetDigest();
|
||||
out.writeByte(op.opType().id());
|
||||
op.writeTo(out);
|
||||
long checksum = out.getChecksum();
|
||||
|
@ -1666,7 +1676,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
current = createWriter(current.getGeneration() + 1);
|
||||
// notify all outstanding views of the new translog (no views are created now as
|
||||
// we hold a write lock).
|
||||
for (FsView view : outstandingViews) {
|
||||
for (View view : outstandingViews) {
|
||||
view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter());
|
||||
}
|
||||
IOUtils.close(oldCurrent);
|
||||
|
@ -1759,5 +1769,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of currently open views
|
||||
*/
|
||||
int getNumOpenViews() {
|
||||
return outstandingViews.size();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.lucene.store.IOContext;
|
|||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -56,6 +57,7 @@ import org.elasticsearch.transport.RemoteTransportException;
|
|||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -123,7 +125,7 @@ public class RecoverySourceHandler {
|
|||
try {
|
||||
phase1Snapshot = shard.snapshotIndex(false);
|
||||
} catch (Throwable e) {
|
||||
Releasables.closeWhileHandlingException(translogView);
|
||||
IOUtils.closeWhileHandlingException(translogView);
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
|
||||
}
|
||||
|
||||
|
|
|
@ -70,13 +70,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
|
|||
super.readFrom(in);
|
||||
recoveryId = in.readLong();
|
||||
shardId = ShardId.readShardId(in);
|
||||
int size = in.readVInt();
|
||||
operations = Lists.newArrayListWithExpectedSize(size);
|
||||
Translog.Snapshot snapshot = Translog.snapshotFromStream(in, size);
|
||||
Translog.Operation next = null;
|
||||
while((next = snapshot.next()) != null) {
|
||||
operations.add(next);
|
||||
}
|
||||
operations = Translog.readOperations(in);
|
||||
totalTranslogOps = in.readVInt();
|
||||
}
|
||||
|
||||
|
@ -85,10 +79,7 @@ class RecoveryTranslogOperationsRequest extends TransportRequest {
|
|||
super.writeTo(out);
|
||||
out.writeLong(recoveryId);
|
||||
shardId.writeTo(out);
|
||||
out.writeVInt(operations.size());
|
||||
for (Translog.Operation operation : operations) {
|
||||
Translog.writeOperation(out, operation);
|
||||
}
|
||||
Translog.writeOperations(out, operations);
|
||||
out.writeVInt(totalTranslogOps);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
|
||||
private final IndexShard shard;
|
||||
private final StartRecoveryRequest request;
|
||||
private static final Translog.View EMPTY_VIEW = new EmptyView();
|
||||
|
||||
public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ESLogger logger) {
|
||||
super(shard, request, recoverySettings, transportService, logger);
|
||||
|
@ -59,7 +58,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
shard.failShard("failed to close engine (phase1)", e);
|
||||
}
|
||||
}
|
||||
prepareTargetForTranslog(EMPTY_VIEW);
|
||||
prepareTargetForTranslog(Translog.View.EMPTY_VIEW);
|
||||
finalizeRecovery();
|
||||
return response;
|
||||
} catch (Throwable t) {
|
||||
|
@ -88,33 +87,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
|
|||
return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary();
|
||||
}
|
||||
|
||||
/**
|
||||
* An empty view since we don't recover from translog even in the shared FS case
|
||||
*/
|
||||
private static class EmptyView implements Translog.View {
|
||||
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sizeInBytes() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Snapshot snapshot() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long minTranslogGeneration() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.index.Term;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.ByteArrayDataOutput;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -70,6 +71,7 @@ import static org.hamcrest.Matchers.*;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||
public class TranslogTests extends ElasticsearchTestCase {
|
||||
|
||||
protected final ShardId shardId = new ShardId(new Index("index"), 1);
|
||||
|
@ -106,6 +108,7 @@ public class TranslogTests extends ElasticsearchTestCase {
|
|||
@After
|
||||
public void tearDown() throws Exception {
|
||||
try {
|
||||
assertEquals("there are still open views", 0, translog.getNumOpenViews());
|
||||
translog.close();
|
||||
} finally {
|
||||
super.tearDown();
|
||||
|
@ -1021,23 +1024,18 @@ public class TranslogTests extends ElasticsearchTestCase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
public void testSnapshotFromStreamInput() throws IOException {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
List<Translog.Operation> ops = newArrayList();
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
|
||||
Translog.writeOperation(out, test);
|
||||
ops.add(test);
|
||||
}
|
||||
Translog.Snapshot snapshot = Translog.snapshotFromStream(StreamInput.wrap(out.bytes()), ops.size());
|
||||
assertEquals(ops.size(), snapshot.estimatedTotalOperations());
|
||||
for (Translog.Operation op : ops) {
|
||||
assertEquals(op, snapshot.next());
|
||||
}
|
||||
assertNull(snapshot.next());
|
||||
// no need to close
|
||||
Translog.writeOperations(out, ops);
|
||||
final List<Translog.Operation> readOperations = Translog.readOperations(StreamInput.wrap(out.bytes()));
|
||||
assertEquals(ops.size(), readOperations.size());
|
||||
assertEquals(ops, readOperations);
|
||||
}
|
||||
|
||||
public void testLocationHashCodeEquals() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue