move index/delete refresh to happen outside of the engine

This commit is contained in:
kimchy 2011-03-29 01:28:37 +02:00
parent 53935f078a
commit 95e36a073a
5 changed files with 51 additions and 44 deletions

View File

@ -152,10 +152,18 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.versionType(request.versionType()) .versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY); .origin(Engine.Operation.Origin.PRIMARY);
delete.refresh(request.refresh());
indexShard.delete(delete); indexShard.delete(delete);
// update the request with teh version so it will go to the replicas // update the request with teh version so it will go to the replicas
request.version(delete.version()); request.version(delete.version());
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
} catch (Exception e) {
// ignore
}
}
DeleteResponse response = new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound()); DeleteResponse response = new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound());
return new PrimaryResponse<DeleteResponse>(response, null); return new PrimaryResponse<DeleteResponse>(response, null);
} }
@ -165,7 +173,15 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA); .origin(Engine.Operation.Origin.REPLICA);
delete.refresh(request.refresh());
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
} catch (Exception e) {
// ignore
}
}
indexShard.delete(delete); indexShard.delete(delete);
} }

View File

@ -75,10 +75,19 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY); .origin(Engine.Operation.Origin.PRIMARY);
delete.refresh(request.refresh());
indexShard.delete(delete); indexShard.delete(delete);
// update the version to happen on the replicas // update the version to happen on the replicas
request.version(delete.version()); request.version(delete.version());
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
} catch (Exception e) {
// ignore
}
}
ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.notFound()); ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.notFound());
return new PrimaryResponse<ShardDeleteResponse>(response, null); return new PrimaryResponse<ShardDeleteResponse>(response, null);
} }
@ -88,8 +97,16 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
IndexShard indexShard = indexShard(shardRequest); IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()) Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA); .origin(Engine.Operation.Origin.REPLICA);
delete.refresh(request.refresh());
indexShard.delete(delete); indexShard.delete(delete);
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
} catch (Exception e) {
// ignore
}
}
} }
@Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteRequest request) { @Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteRequest request) {

View File

@ -181,7 +181,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.version(request.version()) .version(request.version())
.versionType(request.versionType()) .versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY); .origin(Engine.Operation.Origin.PRIMARY);
index.refresh(request.refresh());
doc = indexShard.index(index); doc = indexShard.index(index);
version = index.version(); version = index.version();
} else { } else {
@ -189,10 +188,16 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.version(request.version()) .version(request.version())
.versionType(request.versionType()) .versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY); .origin(Engine.Operation.Origin.PRIMARY);
create.refresh(request.refresh());
doc = indexShard.create(create); doc = indexShard.create(create);
version = create.version(); version = create.version();
} }
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
} catch (Exception e) {
// ignore
}
}
if (doc.mappersAdded()) { if (doc.mappersAdded()) {
updateMappingOnMaster(request); updateMappingOnMaster(request);
} }
@ -225,15 +230,20 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
Engine.Index index = indexShard.prepareIndex(sourceToParse) Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version()) .version(request.version())
.origin(Engine.Operation.Origin.REPLICA); .origin(Engine.Operation.Origin.REPLICA);
index.refresh(request.refresh());
indexShard.index(index); indexShard.index(index);
} else { } else {
Engine.Create create = indexShard.prepareCreate(sourceToParse) Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version()) .version(request.version())
.origin(Engine.Operation.Origin.REPLICA); .origin(Engine.Operation.Origin.REPLICA);
create.refresh(request.refresh());
indexShard.create(create); indexShard.create(create);
} }
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh(false));
} catch (Exception e) {
// ignore
}
}
} }
private void updateMappingOnMaster(final IndexRequest request) { private void updateMappingOnMaster(final IndexRequest request) {

View File

@ -290,7 +290,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Create implements Operation { static class Create implements Operation {
private final Term uid; private final Term uid;
private final ParsedDocument doc; private final ParsedDocument doc;
private boolean refresh;
private long version; private long version;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY; private Origin origin = Origin.PRIMARY;
@ -367,14 +366,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.source(); return this.doc.source();
} }
public boolean refresh() {
return refresh;
}
public void refresh(boolean refresh) {
this.refresh = refresh;
}
public UidField uidField() { public UidField uidField() {
return (UidField) doc().getFieldable(UidFieldMapper.NAME); return (UidField) doc().getFieldable(UidFieldMapper.NAME);
} }
@ -383,7 +374,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Index implements Operation { static class Index implements Operation {
private final Term uid; private final Term uid;
private final ParsedDocument doc; private final ParsedDocument doc;
private boolean refresh;
private long version; private long version;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY; private Origin origin = Origin.PRIMARY;
@ -460,14 +450,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.source(); return this.doc.source();
} }
public boolean refresh() {
return refresh;
}
public void refresh(boolean refresh) {
this.refresh = refresh;
}
public UidField uidField() { public UidField uidField() {
return (UidField) doc().getFieldable(UidFieldMapper.NAME); return (UidField) doc().getFieldable(UidFieldMapper.NAME);
} }
@ -477,7 +459,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final String type; private final String type;
private final String id; private final String id;
private final Term uid; private final Term uid;
private boolean refresh;
private long version; private long version;
private VersionType versionType = VersionType.INTERNAL; private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY; private Origin origin = Origin.PRIMARY;
@ -514,14 +495,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.uid; return this.uid;
} }
public boolean refresh() {
return refresh;
}
public void refresh(boolean refresh) {
this.refresh = refresh;
}
public Delete version(long version) { public Delete version(long version) {
this.version = version; this.version = version;
return this; return this;

View File

@ -238,9 +238,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
innerCreate(create, writer); innerCreate(create, writer);
dirty = true; dirty = true;
possibleMergeNeeded = true; possibleMergeNeeded = true;
if (create.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) { } catch (IOException e) {
throw new CreateFailedEngineException(shardId, create, e); throw new CreateFailedEngineException(shardId, create, e);
} finally { } finally {
@ -342,9 +339,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
innerIndex(index, writer); innerIndex(index, writer);
dirty = true; dirty = true;
possibleMergeNeeded = true; possibleMergeNeeded = true;
if (index.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) { } catch (IOException e) {
throw new IndexFailedEngineException(shardId, index, e); throw new IndexFailedEngineException(shardId, index, e);
} finally { } finally {
@ -439,9 +433,6 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
innerDelete(delete, writer); innerDelete(delete, writer);
dirty = true; dirty = true;
possibleMergeNeeded = true; possibleMergeNeeded = true;
if (delete.refresh()) {
refresh(new Refresh(false));
}
} catch (IOException e) { } catch (IOException e) {
throw new DeleteFailedEngineException(shardId, delete, e); throw new DeleteFailedEngineException(shardId, delete, e);
} finally { } finally {