change internal fs translog to be less blocking, and easier to refactor later for better flush concurrency

This commit is contained in:
kimchy 2011-05-19 11:39:18 +03:00
parent ed8d6bbcd3
commit ebd95b7eb8
18 changed files with 194 additions and 384 deletions

View File

@ -307,7 +307,7 @@ public class SimpleEngineBenchmark {
ThreadPool threadPool = new ThreadPool();
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
Engine engine = new RobinEngine(shardId, settings, new ThreadPool(), new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
Engine engine = new RobinEngine(shardId, settings, new ThreadPool(), new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog")), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
engine.start();

View File

@ -158,7 +158,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
if (indexShard.state() == IndexShardState.STARTED) {
// shardStatus.estimatedFlushableMemorySize = indexShard.estimateFlushableMemorySize();
shardStatus.translogId = indexShard.translog().currentId();
shardStatus.translogOperations = indexShard.translog().numberOfOperations();
shardStatus.translogOperations = indexShard.translog().estimatedNumberOfOperations();
Engine.Searcher searcher = indexShard.searcher();
try {
shardStatus.docs = new DocsStatus();

View File

@ -239,7 +239,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogLength = translogSnapshot.length();
lastTotalTranslogOperations = translogSnapshot.totalOperations();
lastTotalTranslogOperations = translogSnapshot.estimatedTotalOperations();
return snapshotStatus;
}
return null;

View File

@ -249,7 +249,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
if (snapshot.newTranslogCreated()) {
if (translogSnapshot.lengthInBytes() > 0) {
snapshotRequired = true;
expectedNumberOfOperations = translogSnapshot.totalOperations();
expectedNumberOfOperations = translogSnapshot.estimatedTotalOperations();
}
} else {
// if we have a commit point, check that we have all the files listed in it in the blob store
@ -269,20 +269,20 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
translogSnapshot.seekForward(snapshot.lastTranslogLength());
if (translogSnapshot.lengthInBytes() > 0) {
snapshotRequired = true;
expectedNumberOfOperations = translogSnapshot.totalOperations() - snapshot.lastTotalTranslogOperations();
expectedNumberOfOperations = translogSnapshot.estimatedTotalOperations() - snapshot.lastTotalTranslogOperations();
}
} // else (no operations, nothing to snapshot)
} else {
// a full translog snapshot is required
if (translogSnapshot.lengthInBytes() > 0) {
expectedNumberOfOperations = translogSnapshot.totalOperations();
expectedNumberOfOperations = translogSnapshot.estimatedTotalOperations();
snapshotRequired = true;
}
}
} else {
// no commit point, snapshot all the translog
if (translogSnapshot.lengthInBytes() > 0) {
expectedNumberOfOperations = translogSnapshot.totalOperations();
expectedNumberOfOperations = translogSnapshot.estimatedTotalOperations();
snapshotRequired = true;
}
}

View File

@ -60,7 +60,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(1));
TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(10));
if (sync.millis() > 0) {
this.indexShard.translog().syncOnEachOperation(false);
// we don't need to execute the sync on a different thread, just do it on the scheduler thread

View File

@ -93,7 +93,7 @@ public class IndexShardManagement extends AbstractIndexShardComponent implements
@ManagedAttribute(description = "Number of transaction log operations")
public long getTranslogNumberOfOperations() {
return translog.numberOfOperations();
return translog.estimatedNumberOfOperations();
}
@ManagedAttribute(description = "Estimated size in memory the transaction log takes")

View File

@ -48,7 +48,7 @@ public interface Translog extends IndexShardComponent {
/**
* Returns the number of operations in the transaction log.
*/
int numberOfOperations();
int estimatedNumberOfOperations();
/**
* The estimated memory size this translog is taking.
@ -61,14 +61,7 @@ public interface Translog extends IndexShardComponent {
long translogSizeInBytes();
/**
* Creates a new transaction log internally. Note, users of this class should make
* sure that no operations are performed on the trans log when this is called.
*/
void newTranslog() throws TranslogException;
/**
* Creates a new transaction log internally. Note, users of this class should make
* sure that no operations are performed on the trans log when this is called.
* Creates a new transaction log internally.
*/
void newTranslog(long id) throws TranslogException;
@ -127,12 +120,7 @@ public interface Translog extends IndexShardComponent {
/**
* The total number of operations in the translog.
*/
int totalOperations();
/**
* The number of operations in this snapshot.
*/
int snapshotOperations();
int estimatedTotalOperations();
boolean hasNext();

View File

@ -129,7 +129,7 @@ public class TranslogService extends AbstractIndexShardComponent {
return;
}
int currentNumberOfOperations = translog.numberOfOperations();
int currentNumberOfOperations = translog.estimatedNumberOfOperations();
if (currentNumberOfOperations > flushThresholdOperations) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentNumberOfOperations, flushThresholdOperations);
asyncFlushAndReschedule();

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.io.FileChannelInputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
@ -37,14 +36,10 @@ import java.nio.channels.FileChannel;
*/
public class FsChannelSnapshot implements Translog.Snapshot {
private final ShardId shardId;
private final long id;
private final int totalOperations;
private final int snapshotOperations;
private final RafReference raf;
private final FileChannel channel;
@ -57,14 +52,12 @@ public class FsChannelSnapshot implements Translog.Snapshot {
private ByteBuffer cacheBuffer;
public FsChannelSnapshot(ShardId shardId, long id, RafReference raf, long length, int totalOperations, int snapshotOperations) throws FileNotFoundException {
this.shardId = shardId;
public FsChannelSnapshot(long id, RafReference raf, long length, int totalOperations) throws FileNotFoundException {
this.id = id;
this.raf = raf;
this.channel = raf.raf().getChannel();
this.length = length;
this.totalOperations = totalOperations;
this.snapshotOperations = snapshotOperations;
}
@Override public long translogId() {
@ -79,14 +72,10 @@ public class FsChannelSnapshot implements Translog.Snapshot {
return this.length;
}
@Override public int totalOperations() {
@Override public int estimatedTotalOperations() {
return this.totalOperations;
}
@Override public int snapshotOperations() {
return this.snapshotOperations;
}
@Override public InputStream stream() throws IOException {
return new FileChannelInputStream(channel, position, lengthInBytes());
}
@ -143,4 +132,4 @@ public class FsChannelSnapshot implements Translog.Snapshot {
raf.decreaseRefCount(true);
return true;
}
}
}

View File

@ -1,142 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.*;
/**
* @author kimchy (shay.banon)
*/
public class FsStreamSnapshot implements Translog.Snapshot {
private final ShardId shardId;
private final long id;
private final int totalOperations;
private final int snapshotOperations;
private final RafReference raf;
private final long length;
private final DataInputStream dis;
private Translog.Operation lastOperationRead = null;
private int position = 0;
private byte[] cachedData;
public FsStreamSnapshot(ShardId shardId, long id, RafReference raf, long length, int totalOperations, int snapshotOperations) throws FileNotFoundException {
this.shardId = shardId;
this.id = id;
this.raf = raf;
this.length = length;
this.totalOperations = totalOperations;
this.snapshotOperations = snapshotOperations;
this.dis = new DataInputStream(new FileInputStream(raf.file()));
}
@Override public long translogId() {
return this.id;
}
@Override public long position() {
return this.position;
}
@Override public long length() {
return this.length;
}
@Override public int totalOperations() {
return this.totalOperations;
}
@Override public int snapshotOperations() {
return this.snapshotOperations;
}
@Override public InputStream stream() throws IOException {
return dis;
}
@Override public long lengthInBytes() {
return length - position;
}
@Override public boolean hasNext() {
try {
if (position > length) {
return false;
}
int opSize = dis.readInt();
position += 4;
if ((position + opSize) > length) {
// restore the position to before we read the opSize
position -= 4;
return false;
}
position += opSize;
if (cachedData == null) {
cachedData = new byte[opSize];
} else if (cachedData.length < opSize) {
cachedData = new byte[opSize];
}
dis.readFully(cachedData, 0, opSize);
lastOperationRead = TranslogStreams.readTranslogOperation(new BytesStreamInput(cachedData, 0, opSize));
return true;
} catch (Exception e) {
return false;
}
}
@Override public Translog.Operation next() {
return this.lastOperationRead;
}
@Override public void seekForward(long length) {
this.position += length;
try {
this.dis.skip(length);
} catch (IOException e) {
throw new TranslogException(shardId, "failed to seek forward", e);
}
}
@Override public boolean release() throws ElasticSearchException {
try {
dis.close();
} catch (IOException e) {
// ignore
}
raf.decreaseRefCount(true);
return true;
}
}

View File

@ -33,9 +33,6 @@ import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
@ -44,37 +41,20 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private final File location;
private final boolean useStream;
private final Object mutex = new Object();
private volatile FsTranslogFile current;
private boolean syncOnEachOperation = false;
private volatile long id = 0;
private final AtomicInteger operationCounter = new AtomicInteger();
private AtomicLong lastPosition = new AtomicLong(0);
private AtomicLong lastWrittenPosition = new AtomicLong(0);
private RafReference raf;
@Inject public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(shardId, indexSettings);
this.location = new File(nodeEnv.shardLocation(shardId), "translog");
this.location.mkdirs();
this.useStream = componentSettings.getAsBoolean("use_stream", false);
}
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location) {
this(shardId, indexSettings, location, false);
}
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File location, boolean useStream) {
super(shardId, indexSettings);
this.location = location;
this.location.mkdirs();
this.useStream = useStream;
}
public File location() {
@ -82,11 +62,19 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
@Override public long currentId() {
return this.id;
FsTranslogFile current1 = this.current;
if (current1 == null) {
return -1;
}
return current1.id();
}
@Override public int numberOfOperations() {
return operationCounter.get();
@Override public int estimatedNumberOfOperations() {
FsTranslogFile current1 = this.current;
if (current1 == null) {
return 0;
}
return current1.estimatedNumberOfOperations();
}
@Override public long memorySizeInBytes() {
@ -94,63 +82,45 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
@Override public long translogSizeInBytes() {
return lastWrittenPosition.get();
FsTranslogFile current1 = this.current;
if (current1 == null) {
return 0;
}
return current1.translogSizeInBytes();
}
@Override public void clearUnreferenced() {
synchronized (mutex) {
File[] files = location.listFiles();
if (files != null) {
for (File file : files) {
if (file.getName().equals("translog-" + id)) {
continue;
}
try {
file.delete();
} catch (Exception e) {
// ignore
}
File[] files = location.listFiles();
if (files != null) {
for (File file : files) {
if (file.getName().equals("translog-" + current.id())) {
continue;
}
try {
file.delete();
} catch (Exception e) {
// ignore
}
}
}
}
@Override public void newTranslog() throws TranslogException {
synchronized (mutex) {
operationCounter.set(0);
lastPosition.set(0);
lastWrittenPosition.set(0);
this.id = id + 1;
if (raf != null) {
raf.decreaseRefCount(true);
}
try {
raf = new RafReference(new File(location, "translog-" + id));
raf.raf().setLength(0);
} catch (IOException e) {
raf = null;
throw new TranslogException(shardId, "translog not found", e);
}
}
}
@Override public void newTranslog(long id) throws TranslogException {
synchronized (mutex) {
operationCounter.set(0);
lastPosition.set(0);
lastWrittenPosition.set(0);
this.id = id;
if (raf != null) {
raf.decreaseRefCount(true);
}
try {
raf = new RafReference(new File(location, "translog-" + id));
// clean the file if it exists
raf.raf().setLength(0);
} catch (IOException e) {
raf = null;
throw new TranslogException(shardId, "translog not found", e);
FsTranslogFile newFile;
try {
newFile = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
FsTranslogFile old = current;
current = newFile;
if (old != null) {
// we might create a new translog overriding the current translog id
boolean delete = true;
if (old.id() == id) {
delete = false;
}
old.close(delete);
}
}
@ -165,85 +135,50 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
out.seek(0);
out.writeInt(size - 4);
long position = lastPosition.getAndAdd(size);
// use channel#write and not raf#write since it allows for concurrent writes
// with regards to positions
raf.channel().write(ByteBuffer.wrap(out.unsafeByteArray(), 0, size), position);
current.add(out.unsafeByteArray(), 0, size);
if (syncOnEachOperation) {
raf.channel().force(false);
}
synchronized (mutex) {
lastWrittenPosition.getAndAdd(size);
operationCounter.incrementAndGet();
current.sync();
}
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
}
}
@Override public Snapshot snapshot() throws TranslogException {
synchronized (mutex) {
try {
raf.increaseRefCount();
raf.channel().force(true); // sync here, so we make sure we read back teh data?
if (useStream) {
return new FsStreamSnapshot(shardId, this.id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get());
} else {
return new FsChannelSnapshot(shardId, this.id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get());
}
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
@Override public FsChannelSnapshot snapshot() throws TranslogException {
while (true) {
FsChannelSnapshot snapshot = current.snapshot();
if (snapshot != null) {
return snapshot;
}
Thread.yield();
}
}
@Override public Snapshot snapshot(Snapshot snapshot) {
synchronized (mutex) {
if (currentId() != snapshot.translogId()) {
return snapshot();
}
try {
raf.increaseRefCount();
raf.channel().force(true); // sync here, so we make sure we read back teh data?
if (useStream) {
FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get() - snapshot.totalOperations());
newSnapshot.seekForward(snapshot.position());
return newSnapshot;
} else {
FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get() - snapshot.totalOperations());
newSnapshot.seekForward(snapshot.position());
return newSnapshot;
}
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
}
FsChannelSnapshot snap = snapshot();
if (snap.translogId() == snapshot.translogId()) {
snap.seekForward(snapshot.position());
}
return snap;
}
@Override public void sync() {
synchronized (mutex) {
if (raf != null) {
try {
raf.channel().force(true);
} catch (Exception e) {
// ignore
}
}
FsTranslogFile current1 = this.current;
if (current1 == null) {
return;
}
current1.sync();
}
@Override public void syncOnEachOperation(boolean syncOnEachOperation) {
synchronized (mutex) {
this.syncOnEachOperation = syncOnEachOperation;
}
this.syncOnEachOperation = syncOnEachOperation;
}
@Override public void close(boolean delete) {
synchronized (mutex) {
if (raf != null) {
raf.decreaseRefCount(delete);
raf = null;
}
FsTranslogFile current1 = this.current;
if (current1 == null) {
return;
}
current1.close(delete);
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class FsTranslogFile {
private final long id;
private final ShardId shardId;
private final RafReference raf;
private final AtomicInteger operationCounter = new AtomicInteger();
private final AtomicLong lastPosition = new AtomicLong(0);
private final AtomicLong lastWrittenPosition = new AtomicLong(0);
public FsTranslogFile(ShardId shardId, long id, RafReference raf) throws IOException {
this.shardId = shardId;
this.id = id;
this.raf = raf;
raf.raf().setLength(0);
}
public long id() {
return this.id;
}
public int estimatedNumberOfOperations() {
return operationCounter.get();
}
public long translogSizeInBytes() {
return lastWrittenPosition.get();
}
public void add(byte[] data, int from, int size) throws IOException {
long position = lastPosition.getAndAdd(size);
raf.channel().write(ByteBuffer.wrap(data, from, size), position);
lastWrittenPosition.getAndAdd(size);
operationCounter.incrementAndGet();
}
public void close(boolean delete) {
raf.decreaseRefCount(delete);
}
/**
* Returns a snapshot on this file, <tt>null</tt> if it failed to snapshot.
*/
public FsChannelSnapshot snapshot() throws TranslogException {
try {
if (!raf.increaseRefCount()) {
return null;
}
return new FsChannelSnapshot(this.id, raf, lastWrittenPosition.get(), operationCounter.get());
} catch (Exception e) {
throw new TranslogException(shardId, "Failed to snapshot", e);
}
}
public void sync() {
try {
raf.channel().force(false);
} catch (Exception e) {
// ignore
}
}
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.io.FileSystemUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -60,8 +58,12 @@ public class RafReference {
return this.raf;
}
public void increaseRefCount() {
refCount.incrementAndGet();
/**
* Increases the ref count, and returns <tt>true</tt> if it managed to
* actually increment it.
*/
public boolean increaseRefCount() {
return refCount.incrementAndGet() > 1;
}
public void decreaseRefCount(boolean delete) {
@ -70,8 +72,6 @@ public class RafReference {
raf.close();
if (delete) {
file.delete();
} else {
FileSystemUtils.syncFile(file);
}
} catch (IOException e) {
// ignore

View File

@ -133,7 +133,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
continue;
}
// check if it is deemed to be inactive (sam translogId and numberOfOperations over a long period of time)
if (status.translogId == translog.currentId() && translog.numberOfOperations() == 0) {
if (status.translogId == translog.currentId() && translog.estimatedNumberOfOperations() == 0) {
if (status.time == -1) { // first time
status.time = time;
}
@ -156,7 +156,7 @@ public class IndexingMemoryBufferController extends AbstractLifecycleComponent<I
status.time = -1;
}
status.translogId = translog.currentId();
status.translogNumberOfOperations = translog.numberOfOperations();
status.translogNumberOfOperations = translog.estimatedNumberOfOperations();
}
}
if (activeInactiveStatusChanges) {

View File

@ -100,11 +100,11 @@ public abstract class AbstractSimpleEngineTests {
}
protected Translog createTranslog() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog/primary"), false);
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog/primary"));
}
protected Translog createTranslogReplica() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog/replica"), false);
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog/replica"));
}
protected IndexDeletionPolicy createIndexDeletionPolicy() {

View File

@ -42,7 +42,7 @@ public abstract class AbstractSimpleTranslogTests {
@BeforeMethod public void setUp() {
translog = create();
translog.newTranslog();
translog.newTranslog(1);
}
@AfterMethod public void tearDown() {
@ -59,29 +59,25 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
assertThat(snapshot.totalOperations(), equalTo(1));
assertThat(snapshot.snapshotOperations(), equalTo(1));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.release();
translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(2));
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.snapshotOperations(), equalTo(2));
assertThat(snapshot.estimatedTotalOperations(), equalTo(2));
snapshot.release();
translog.add(new Translog.Delete(newUid("3")));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(3));
assertThat(snapshot.totalOperations(), equalTo(3));
assertThat(snapshot.snapshotOperations(), equalTo(3));
assertThat(snapshot.estimatedTotalOperations(), equalTo(3));
snapshot.release();
translog.add(new Translog.DeleteByQuery(new byte[]{4}, null));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(4));
assertThat(snapshot.totalOperations(), equalTo(4));
assertThat(snapshot.snapshotOperations(), equalTo(4));
assertThat(snapshot.estimatedTotalOperations(), equalTo(4));
snapshot.release();
snapshot = translog.snapshot();
@ -107,13 +103,12 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.release();
long firstId = translog.currentId();
translog.newTranslog();
translog.newTranslog(2);
assertThat(translog.currentId(), Matchers.not(equalTo(firstId)));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(0));
assertThat(snapshot.totalOperations(), equalTo(0));
assertThat(snapshot.snapshotOperations(), equalTo(0));
assertThat(snapshot.estimatedTotalOperations(), equalTo(0));
snapshot.release();
}
@ -125,8 +120,7 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
assertThat(snapshot.totalOperations(), equalTo(1));
assertThat(snapshot.snapshotOperations(), equalTo(1));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.release();
snapshot = translog.snapshot();
@ -139,14 +133,12 @@ public abstract class AbstractSimpleTranslogTests {
// we use the translogSize to also navigate to the last position on this snapshot
// so snapshot(Snapshot) will work properly
assertThat(snapshot1, translogSize(1));
assertThat(snapshot1.totalOperations(), equalTo(1));
assertThat(snapshot1.snapshotOperations(), equalTo(1));
assertThat(snapshot1.estimatedTotalOperations(), equalTo(1));
translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot(snapshot1);
assertThat(snapshot, translogSize(1));
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.snapshotOperations(), equalTo(1));
assertThat(snapshot.estimatedTotalOperations(), equalTo(2));
snapshot.release();
snapshot = translog.snapshot(snapshot1);
@ -154,8 +146,7 @@ public abstract class AbstractSimpleTranslogTests {
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index.source(), equalTo(new byte[]{2}));
assertThat(snapshot.hasNext(), equalTo(false));
assertThat(snapshot.totalOperations(), equalTo(2));
assertThat(snapshot.snapshotOperations(), equalTo(1));
assertThat(snapshot.estimatedTotalOperations(), equalTo(2));
snapshot.release();
snapshot1.release();
}
@ -170,7 +161,7 @@ public abstract class AbstractSimpleTranslogTests {
translog.add(new Translog.Index("test", "2", new byte[]{2}));
translog.newTranslog();
translog.newTranslog(2);
translog.add(new Translog.Index("test", "3", new byte[]{3}));

View File

@ -34,10 +34,10 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
public class FsChannelSimpleTranslogTests extends AbstractSimpleTranslogTests {
@Override protected Translog create() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false);
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"));
}
@AfterClass public void cleanup() {
FileSystemUtils.deleteRecursively(new File("work/fs-translog"), true);
}
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog.fs;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.index.translog.AbstractSimpleTranslogTests;
import org.elasticsearch.index.translog.Translog;
import org.testng.annotations.AfterClass;
import java.io.File;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (shay.banon)
*/
public class FsStreamSimpleTranslogTests extends AbstractSimpleTranslogTests {
@Override protected Translog create() {
return new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), true);
}
@AfterClass public void cleanup() {
FileSystemUtils.deleteRecursively(new File("work/fs-translog"), true);
}
}