Merge pull request #10819 from jpountz/fix/transport_index_duplication

Reduce code duplication in TransportIndexAction/TransportShardBulkAction.
This commit is contained in:
Adrien Grand 2015-04-27 10:30:21 +02:00
commit 97ff93ff07
7 changed files with 121 additions and 163 deletions

View File

@ -359,72 +359,46 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
long version;
boolean created;
Engine.IndexingOperation op;
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
version = index.version();
op = index;
created = index.created();
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
version = create.version();
op = create;
created = true;
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
// update the version on request so it will happen on the replicas
final long version = operation.version();
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.version(version);
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
IndexResponse indexResponse = new IndexResponse(request.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, op);
return new WriteResult(indexResponse, operation);
}
private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
@ -548,23 +522,20 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse,
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
@ -172,62 +173,39 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
boolean created;
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.index(index);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.index(index);
}
} else {
indexShard.index(index);
}
version = index.version();
created = index.created();
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse,
operation = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
indexShard.create(create);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
indexShard.create(create);
}
} else {
indexShard.create(create);
}
version = create.version();
created = true;
}
final boolean created;
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");
@ -237,6 +215,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
// update the version on the request, so it will be used for the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
@ -250,22 +229,19 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexShard indexShard = indexService.shardSafe(shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
Mapping update = index.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.index(index);
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
Mapping update = create.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
indexShard.create(create);
operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_index");

View File

@ -47,6 +47,7 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -200,7 +201,7 @@ public abstract class Engine implements Closeable {
public abstract void create(Create create) throws EngineException;
public abstract void index(Index index) throws EngineException;
public abstract boolean index(Index index) throws EngineException;
public abstract void delete(Delete delete) throws EngineException;
@ -704,6 +705,12 @@ public abstract class Engine implements Closeable {
public long endTime() {
return this.endTime;
}
/**
* Execute this operation against the provided {@link IndexShard} and
* return whether the document was created.
*/
public abstract boolean execute(IndexShard shard);
}
public static final class Create extends IndexingOperation {
@ -732,10 +739,15 @@ public abstract class Engine implements Closeable {
public boolean autoGeneratedId() {
return this.autoGeneratedId;
}
@Override
public boolean execute(IndexShard shard) {
shard.create(this);
return true;
}
}
public static final class Index extends IndexingOperation {
private boolean created;
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
super(docMapper, uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
@ -754,15 +766,9 @@ public abstract class Engine implements Closeable {
return Type.INDEX;
}
/**
* @return true if object was created
*/
public boolean created() {
return created;
}
public void created(boolean created) {
this.created = created;
@Override
public boolean execute(IndexShard shard) {
return shard.index(this);
}
}

View File

@ -364,15 +364,16 @@ public class InternalEngine extends Engine {
}
@Override
public void index(Index index) throws EngineException {
public boolean index(Index index) throws EngineException {
final boolean created;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (index.origin() == Operation.Origin.RECOVERY) {
// Don't throttle recovery operations
innerIndex(index);
created = innerIndex(index);
} else {
try (Releasable r = throttle.acquireThrottle()) {
innerIndex(index);
created = innerIndex(index);
}
}
flushNeeded = true;
@ -381,6 +382,7 @@ public class InternalEngine extends Engine {
throw new IndexFailedEngineException(shardId, index, t);
}
checkVersionMapRefresh();
return created;
}
/**
@ -410,7 +412,7 @@ public class InternalEngine extends Engine {
}
}
private void innerIndex(Index index) throws IOException {
private boolean innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
final long currentVersion;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
@ -428,17 +430,18 @@ public class InternalEngine extends Engine {
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
return false;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final boolean created;
index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
index.created(true);
created = true;
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
} else {
@ -446,7 +449,9 @@ public class InternalEngine extends Engine {
}
} else {
if (versionValue != null) {
index.created(versionValue.delete()); // we have a delete which is not GC'ed...
created = versionValue.delete(); // we have a delete which is not GC'ed...
} else {
created = false;
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs());
@ -459,6 +464,7 @@ public class InternalEngine extends Engine {
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
indexingService.postIndexUnderLock(index);
return created;
}
}

View File

@ -108,7 +108,7 @@ public class ShadowEngine extends Engine {
}
@Override
public void index(Index index) throws EngineException {
public boolean index(Index index) throws EngineException {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}

View File

@ -470,7 +470,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return new Engine.Create(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates, autoGeneratedId);
}
public ParsedDocument create(Engine.Create create) throws ElasticsearchException {
public void create(Engine.Create create) throws ElasticsearchException {
writeAllowed(create.origin());
create = indexingService.preCreate(create);
mapperAnalyzer.setType(create.type());
@ -485,7 +485,6 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
indexingService.postCreate(create);
return create.parsedDoc();
}
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
@ -501,22 +500,27 @@ public class IndexShard extends AbstractIndexShardComponent {
return new Engine.Index(docMapper.v1(), docMapper.v1().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates);
}
public ParsedDocument index(Engine.Index index) throws ElasticsearchException {
/**
* Index a document and return whether it was created, as opposed to just
* updated.
*/
public boolean index(Engine.Index index) throws ElasticsearchException {
writeAllowed(index.origin());
index = indexingService.preIndex(index);
mapperAnalyzer.setType(index.type());
final boolean created;
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
engine().index(index);
created = engine().index(index);
index.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postIndex(index, ex);
throw ex;
}
indexingService.postIndex(index);
return index.parsedDoc();
return created;
}
public Engine.Delete prepareDelete(String type, String id, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {

View File

@ -1354,34 +1354,29 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void testBasicCreatedFlag() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertFalse(index.created());
assertFalse(engine.index(index));
engine.delete(new Engine.Delete(null, "1", newUid("1")));
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
}
@Test
public void testCreatedFlagAfterFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
engine.delete(new Engine.Delete(null, "1", newUid("1")));
engine.flush();
index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
assertTrue(index.created());
assertTrue(engine.index(index));
}
private static class MockAppender extends AppenderSkeleton {