allow for concurrent flushing while indexing

This commit is contained in:
kimchy 2011-05-19 19:29:02 +03:00
parent 1911368feb
commit d5759efed7
5 changed files with 113 additions and 17 deletions

View File

@ -305,7 +305,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis())); versionMap.put(create.uid().text(), new VersionValue(create.version(), false, threadPool.estimatedTimeInMillis()));
} }
uidField.version(create.version()); uidField.version(create.version());
writer.addDocument(create.doc(), create.analyzer()); // we use update doc and not addDoc since we might get duplicates when using transient translog
writer.updateDocument(create.uid(), create.doc(), create.analyzer());
translog.add(new Translog.Create(create)); translog.add(new Translog.Create(create));
} else { } else {
long currentVersion; long currentVersion;
@ -687,6 +688,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// We can't do prepareCommit here, since we rely on the the segment version for the translog version // We can't do prepareCommit here, since we rely on the the segment version for the translog version
try { try {
if (flush.full()) {
rwl.writeLock().lock(); rwl.writeLock().lock();
try { try {
if (indexWriter == null) { if (indexWriter == null) {
@ -695,7 +698,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (disableFlushCounter > 0) { if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
} }
if (flush.full()) {
// disable refreshing, not dirty // disable refreshing, not dirty
dirty = false; dirty = false;
try { try {
@ -721,13 +723,26 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
failEngine(e); failEngine(e);
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }
} finally {
rwl.writeLock().unlock();
}
} else { } else {
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
if (flushNeeded) { if (flushNeeded) {
flushNeeded = false; flushNeeded = false;
try { try {
long translogId = translogIdGenerator.incrementAndGet(); long translogId = translogIdGenerator.incrementAndGet();
translog.newTransientTranslog(translogId);
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
translog.newTranslog(translogId); translog.makeTransientCurrent();
} catch (Exception e) { } catch (Exception e) {
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
@ -735,10 +750,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new FlushFailedEngineException(shardId, e); throw new FlushFailedEngineException(shardId, e);
} }
} }
}
} finally { } finally {
rwl.writeLock().unlock(); rwl.readLock().unlock();
} }
}
// we need to refresh in order to clear older version values // we need to refresh in order to clear older version values
long time = threadPool.estimatedTimeInMillis(); // mark time here, before we refresh, and then delete all older values long time = threadPool.estimatedTimeInMillis(); // mark time here, before we refresh, and then delete all older values

View File

@ -64,9 +64,26 @@ public interface Translog extends IndexShardComponent {
/** /**
* Creates a new transaction log internally. * Creates a new transaction log internally.
*
* <p>Can only be called by one thread.
*/ */
void newTranslog(long id) throws TranslogException; void newTranslog(long id) throws TranslogException;
/**
* Creates a new transient translog, where added ops will be added to the current one, and to
* it.
*
* <p>Can only be called by one thread.
*/
void newTransientTranslog(long id) throws TranslogException;
/**
* Swaps the transient translog to be the current one.
*
* <p>Can only be called by one thread.
*/
void makeTransientCurrent();
/** /**
* Adds a create operation to the transaction log. * Adds a create operation to the transaction log.
*/ */
@ -98,6 +115,8 @@ public interface Translog extends IndexShardComponent {
/** /**
* Closes the transaction log. * Closes the transaction log.
*
* <p>Can only be called by one thread.
*/ */
void close(boolean delete); void close(boolean delete);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -42,6 +43,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private final File location; private final File location;
private volatile FsTranslogFile current; private volatile FsTranslogFile current;
private volatile FsTranslogFile trans;
private boolean syncOnEachOperation = false; private boolean syncOnEachOperation = false;
@ -124,6 +126,19 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
} }
@Override public void newTransientTranslog(long id) throws TranslogException {
try {
this.trans = new FsTranslogFile(shardId, id, new RafReference(new File(location, "translog-" + id)));
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
}
@Override public void makeTransientCurrent() {
this.current = this.trans;
this.trans = null;
}
@Override public void add(Operation operation) throws TranslogException { @Override public void add(Operation operation) throws TranslogException {
try { try {
BytesStreamOutput out = CachedStreamOutput.cachedBytes(); BytesStreamOutput out = CachedStreamOutput.cachedBytes();
@ -139,6 +154,14 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
if (syncOnEachOperation) { if (syncOnEachOperation) {
current.sync(); current.sync();
} }
FsTranslogFile trans = this.trans;
if (trans != null) {
try {
trans.add(out.unsafeByteArray(), 0, size);
} catch (ClosedChannelException e) {
// ignore
}
}
} catch (Exception e) { } catch (Exception e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} }
@ -176,9 +199,12 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
@Override public void close(boolean delete) { @Override public void close(boolean delete) {
FsTranslogFile current1 = this.current; FsTranslogFile current1 = this.current;
if (current1 == null) { if (current1 != null) {
return;
}
current1.close(delete); current1.close(delete);
} }
current1 = this.trans;
if (current1 != null) {
current1.close(delete);
}
}
} }

View File

@ -51,6 +51,38 @@ public abstract class AbstractSimpleTranslogTests {
protected abstract Translog create(); protected abstract Translog create();
@Test public void testTransientTranslog() {
Translog.Snapshot snapshot = translog.snapshot();
assertThat(snapshot, translogSize(0));
snapshot.release();
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.release();
translog.newTransientTranslog(2);
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.release();
translog.add(new Translog.Index("test", "2", new byte[]{2}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(2));
assertThat(snapshot.estimatedTotalOperations(), equalTo(2));
snapshot.release();
translog.makeTransientCurrent();
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1)); // now its one, since it only includes "2"
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.release();
}
@Test public void testSimpleOperations() { @Test public void testSimpleOperations() {
Translog.Snapshot snapshot = translog.snapshot(); Translog.Snapshot snapshot = translog.snapshot();
assertThat(snapshot, translogSize(0)); assertThat(snapshot, translogSize(0));

View File

@ -67,6 +67,9 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
logger.info("**** starting indexing thread {}", indexerId); logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get()) { while (!stop.get()) {
long id = idGenerator.incrementAndGet(); long id = idGenerator.incrementAndGet();
if (id % 1000 == 0) {
client("node1").admin().indices().prepareFlush().execute().actionGet();
}
client("node1").prepareIndex("test", "type1", Long.toString(id)) client("node1").prepareIndex("test", "type1", Long.toString(id))
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); .setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
indexCounter.incrementAndGet(); indexCounter.incrementAndGet();