Reduce code duplication in TransportIndexAction/TransportShardBulkAction.

We have some duplication in TransportIndexAction/TransportShardBulkAction due
to the fact that we have totally different branches for INDEX and CREATE
operations. This commit tries to share the logic better between these two cases.
This commit is contained in:
Adrien Grand 2015-04-27 10:19:59 +02:00
parent 8d30c9a392
commit d7d39e1938
7 changed files with 121 additions and 163 deletions

View File

@ -359,72 +359,46 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
long version;
boolean created;
Engine.IndexingOperation op;
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
version = index.version();
op = index;
created = index.created();
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
version = create.version();
op = create;
created = true;
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
// update the version on request so it will happen on the replicas
final long version = operation.version();
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.version(version);
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
IndexResponse indexResponse = new IndexResponse(request.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, op);
return new WriteResult(indexResponse, operation);
}
private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
@ -548,23 +522,20 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
@ -172,62 +173,39 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
boolean created;
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
version = index.version();
created = index.created();
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse,
operation = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
version = create.version();
created = true;
}
final boolean created;
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
@ -237,6 +215,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
// update the version on the request, so it will be used for the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
@ -250,22 +229,19 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexService.shardSafe(shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");

View File

@ -47,6 +47,7 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -200,7 +201,7 @@ public abstract class Engine implements Closeable {
public abstract void create(Create create) throws EngineException;
public abstract void index(Index index) throws EngineException;
public abstract boolean index(Index index) throws EngineException;
public abstract void delete(Delete delete) throws EngineException;
@ -704,6 +705,12 @@ public abstract class Engine implements Closeable {
public long endTime() {
return this.endTime;
}
/**
* Execute this operation against the provided {@link IndexShard} and
* return whether the document was created.
*/
public abstract boolean execute(IndexShard shard);
}
public static final class Create extends IndexingOperation {
@ -732,10 +739,15 @@ public abstract class Engine implements Closeable {
public boolean autoGeneratedId() {
return this.autoGeneratedId;
}
@Override
public boolean execute(IndexShard shard) {
shard.create(this);
return true;
}
}
public static final class Index extends IndexingOperation {
private boolean created;
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
super(docMapper, uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
@ -754,15 +766,9 @@ public abstract class Engine implements Closeable {
return Type.INDEX;
}
/**
* @return true if object was created
*/
public boolean created() {
return created;
}
public void created(boolean created) {
this.created = created;
@Override
public boolean execute(IndexShard shard) {
return shard.index(this);
}
}

View File

@ -364,15 +364,16 @@ public class InternalEngine extends Engine {
}
@Override
public void index(Index index) throws EngineException {
public boolean index(Index index) throws EngineException {
final boolean created;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (index.origin() == Operation.Origin.RECOVERY) {
// Don't throttle recovery operations
innerIndex(index);
created = innerIndex(index);
} else {
try (Releasable r = throttle.acquireThrottle()) {
innerIndex(index);
created = innerIndex(index);
}
}
flushNeeded = true;
@ -381,6 +382,7 @@ public class InternalEngine extends Engine {
throw new IndexFailedEngineException(shardId, index, t);
}
checkVersionMapRefresh();
return created;
}
/**
@ -410,7 +412,7 @@ public class InternalEngine extends Engine {
}
}
private void innerIndex(Index index) throws IOException {
private boolean innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
final long currentVersion;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
@ -428,17 +430,18 @@ public class InternalEngine extends Engine {
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
return false;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final boolean created;
index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
index.created(true);
created = true;
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
} else {
@ -446,7 +449,9 @@ public class InternalEngine extends Engine {
}
} else {
if (versionValue != null) {
index.created(versionValue.delete()); // we have a delete which is not GC'ed...
created = versionValue.delete(); // we have a delete which is not GC'ed...
} else {
created = false;
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs());
@ -459,6 +464,7 @@ public class InternalEngine extends Engine {
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
indexingService.postIndexUnderLock(index);
return created;
}
}

View File

@ -108,7 +108,7 @@ public class ShadowEngine extends Engine {
}
@Override
public void index(Index index) throws EngineException {
public boolean index(Index index) throws EngineException {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}

View File

@ -470,7 +470,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates, autoGeneratedId);
}
public ParsedDocument create(Engine.Create create) throws ElasticsearchException {
public void create(Engine.Create create) throws ElasticsearchException {
writeAllowed(create.origin());
create = indexingService.preCreate(create);
mapperAnalyzer.setType(create.type());
@ -485,7 +485,6 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
indexingService.postCreate(create);
return create.parsedDoc();
}
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
@ -501,22 +500,27 @@ public class IndexShard extends AbstractIndexShardComponent {
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates);
}
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
/**
* Index a document and return whether it was created, as opposed to just
* updated.
*/
public boolean index(Engine.Index index) throws ElasticsearchException {
writeAllowed(index.origin());
index = indexingService.preIndex(index);
mapperAnalyzer.setType(index.type());
final boolean created;
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
engine().index(index);
created = engine().index(index);
index.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postIndex(index, ex);
throw ex;
}
indexingService.postIndex(index);
return index.parsedDoc();
return created;
}
public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {

View File

@ -1354,34 +1354,29 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void testBasicCreatedFlag() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertFalse(index.created());
assertFalse(engine.index(index));
engine.delete(new Engine.Delete(null, "1", newUid("1")));
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
}
@Test
public void testCreatedFlagAfterFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
engine.delete(new Engine.Delete(null, "1", newUid("1")));
engine.flush();
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
}
private static class MockAppender extends AppenderSkeleton {