Merge pull request #20034 from areek/cleanup/index_operation

Set created flag in index operation
This commit is contained in:
Areek Zillur 2016-08-29 12:34:24 -04:00 committed by GitHub
commit 99734ec576
6 changed files with 43 additions and 42 deletions

View File

@ -188,7 +188,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
final boolean created = indexShard.index(operation);
indexShard.index(operation);
// update the version on request so it will happen on the replicas
final long version = operation.version();
@ -197,7 +197,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
assert request.versionType().validateVersionForWrites(request.version());
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), created);
IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), operation.isCreated());
return new WriteResult<>(response, operation.getTranslogLocation());
}
}

View File

@ -277,7 +277,7 @@ public abstract class Engine implements Closeable {
}
}
public abstract boolean index(Index operation) throws EngineException;
public abstract void index(Index operation) throws EngineException;
public abstract void delete(Delete delete) throws EngineException;
@ -847,6 +847,7 @@ public abstract class Engine implements Closeable {
public static class Index extends Operation {
private final ParsedDocument doc;
private boolean created;
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
super(uid, version, versionType, origin, startTime);
@ -905,6 +906,14 @@ public abstract class Engine implements Closeable {
return this.doc.source();
}
public boolean isCreated() {
return created;
}
public void setCreated(boolean created) {
this.created = created;
}
@Override
protected int estimatedSizeInBytes() {
return (id().length() + type().length()) * 2 + source().length() + 12;

View File

@ -386,16 +386,15 @@ public class InternalEngine extends Engine {
}
@Override
public boolean index(Index index) {
final boolean created;
public void index(Index index) {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (index.origin().isRecovery()) {
// Don't throttle recovery operations
created = innerIndex(index);
innerIndex(index);
} else {
try (Releasable r = throttle.acquireThrottle()) {
created = innerIndex(index);
innerIndex(index);
}
}
} catch (IllegalStateException | IOException e) {
@ -406,10 +405,9 @@ public class InternalEngine extends Engine {
}
throw new IndexFailedEngineException(shardId, index.type(), index.id(), e);
}
return created;
}
private boolean innerIndex(Index index) throws IOException {
private void innerIndex(Index index) throws IOException {
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
final long currentVersion;
@ -424,15 +422,16 @@ public class InternalEngine extends Engine {
}
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false;
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
index.setCreated(false);
return;
}
final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
final boolean created = indexOrUpdate(index, currentVersion, versionValue);
indexOrUpdate(index, currentVersion, versionValue);
maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
return created;
}
}
@ -442,16 +441,14 @@ public class InternalEngine extends Engine {
return updatedVersion;
}
private boolean indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
final boolean created;
private void indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
created = true;
index.setCreated(true);
index(index, indexWriter);
} else {
created = update(index, versionValue, indexWriter);
update(index, versionValue, indexWriter);
}
return created;
}
private static void index(final Index index, final IndexWriter indexWriter) throws IOException {
@ -462,19 +459,17 @@ public class InternalEngine extends Engine {
}
}
private static boolean update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
final boolean created;
private static void update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
if (versionValue != null) {
created = versionValue.delete(); // we have a delete which is not GC'ed...
index.setCreated(versionValue.delete()); // we have a delete which is not GC'ed...
} else {
created = false;
index.setCreated(false);
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs());
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0));
}
return created;
}
@Override

View File

@ -106,7 +106,7 @@ public class ShadowEngine extends Engine {
@Override
public boolean index(Index index) throws EngineException {
public void index(Index index) throws EngineException {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}

View File

@ -523,34 +523,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Index(uid, doc, version, versionType, origin, startTime);
}
/**
* Index a document and return whether it was created, as opposed to just
* updated.
*/
public boolean index(Engine.Index index) {
public void index(Engine.Index index) {
ensureWriteAllowed(index);
Engine engine = getEngine();
return index(engine, index);
index(engine, index);
}
private boolean index(Engine engine, Engine.Index index) {
private void index(Engine engine, Engine.Index index) {
active.set(true);
index = indexingOperationListeners.preIndex(index);
final boolean created;
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
created = engine.index(index);
engine.index(index);
index.endTime(System.nanoTime());
} catch (Exception e) {
indexingOperationListeners.postIndex(index, e);
throw e;
}
indexingOperationListeners.postIndex(index, created);
return created;
indexingOperationListeners.postIndex(index, index.isCreated());
}
public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {

View File

@ -1475,28 +1475,33 @@ public class InternalEngineTests extends ESTestCase {
public void testBasicCreatedFlag() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
assertTrue(engine.index(index));
engine.index(index);
assertTrue(index.isCreated());
index = new Engine.Index(newUid("1"), doc);
assertFalse(engine.index(index));
engine.index(index);
assertFalse(index.isCreated());
engine.delete(new Engine.Delete(null, "1", newUid("1")));
index = new Engine.Index(newUid("1"), doc);
assertTrue(engine.index(index));
engine.index(index);
assertTrue(index.isCreated());
}
public void testCreatedFlagAfterFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
assertTrue(engine.index(index));
engine.index(index);
assertTrue(index.isCreated());
engine.delete(new Engine.Delete(null, "1", newUid("1")));
engine.flush();
index = new Engine.Index(newUid("1"), doc);
assertTrue(engine.index(index));
engine.index(index);
assertTrue(index.isCreated());
}
private static class MockAppender extends AppenderSkeleton {