Make modifying operations durable by default.
This commit makes create, update and delete operations on an index durable by default. The user has the option to opt out to use async translog flushes on a per-index basis by settings `index.translog.durability=request`. Initial benchmarks running on SSDs have show that indexing is about 7% - 10% slower with bulk indexing compared to async translog flushes. This change is orthogonal to the transaction log sync interval and will only sync the transaction log if the operation has not yet been concurrently synced. Ie. if multiple indexing requests are submitted and one operations sync call already persists the operations of others only one sync call is executed. Relates to #10933
This commit is contained in:
parent
80be0f7217
commit
aa184029b1
|
@ -148,4 +148,4 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
|
|||
public int numberOfActions() {
|
||||
return request.numberOfActions();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -81,14 +81,8 @@ public class BulkShardRequest extends ShardReplicationOperationRequest<BulkShard
|
|||
out.writeVInt(items.length);
|
||||
for (BulkItemRequest item : items) {
|
||||
if (item != null) {
|
||||
// if we are serializing to a node that is pre 1.3.3, make sure to pass null to maintain
|
||||
// the old behavior of putting null in the request to be ignored on the replicas
|
||||
if (item.isIgnoreOnReplica() && out.getVersion().before(Version.V_1_3_3)) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
item.writeTo(out);
|
||||
}
|
||||
out.writeBoolean(true);
|
||||
item.writeTo(out);
|
||||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.action.bulk;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
|
@ -41,7 +40,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
@ -51,14 +49,13 @@ import org.elasticsearch.index.VersionType;
|
|||
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.river.RiverIndexName;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -116,11 +113,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
@Override
|
||||
protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
|
||||
final BulkShardRequest request = shardRequest.request;
|
||||
IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
|
||||
final IndexService indexService = indicesService.indexServiceSafe(request.index());
|
||||
final IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
|
||||
|
||||
long[] preVersions = new long[request.items().length];
|
||||
VersionType[] preVersionTypes = new VersionType[request.items().length];
|
||||
Translog.Location location = null;
|
||||
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
if (item.request() instanceof IndexRequest) {
|
||||
|
@ -128,7 +126,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
preVersions[requestIndex] = indexRequest.version();
|
||||
preVersionTypes[requestIndex] = indexRequest.versionType();
|
||||
try {
|
||||
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, indexService, true);
|
||||
WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
|
||||
location = locationToSync(location, result.location);
|
||||
// add the response
|
||||
IndexResponse indexResponse = result.response();
|
||||
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
|
||||
|
@ -163,7 +162,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
|
||||
try {
|
||||
// add the response
|
||||
DeleteResponse deleteResponse = shardDeleteOperation(request, deleteRequest, indexShard).response();
|
||||
final WriteResult<DeleteResponse> writeResult = shardDeleteOperation(request, deleteRequest, indexShard);
|
||||
DeleteResponse deleteResponse = writeResult.response();
|
||||
location = locationToSync(location, writeResult.location);
|
||||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
|
||||
} catch (Throwable e) {
|
||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||
|
@ -197,15 +198,18 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
|
||||
UpdateResult updateResult;
|
||||
try {
|
||||
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard, indexService);
|
||||
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
|
||||
} catch (Throwable t) {
|
||||
updateResult = new UpdateResult(null, null, false, t, null);
|
||||
}
|
||||
if (updateResult.success()) {
|
||||
if (updateResult.writeResult != null) {
|
||||
location = locationToSync(location, updateResult.writeResult.location);
|
||||
}
|
||||
switch (updateResult.result.operation()) {
|
||||
case UPSERT:
|
||||
case INDEX:
|
||||
WriteResult result = updateResult.writeResult;
|
||||
WriteResult<IndexResponse> result = updateResult.writeResult;
|
||||
IndexRequest indexRequest = updateResult.request();
|
||||
BytesReference indexSourceAsBytes = indexRequest.source();
|
||||
// add the response
|
||||
|
@ -219,7 +223,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
|
||||
break;
|
||||
case DELETE:
|
||||
DeleteResponse response = updateResult.writeResult.response();
|
||||
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
|
||||
DeleteResponse response = writeResult.response();
|
||||
DeleteRequest deleteRequest = updateResult.request();
|
||||
updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
||||
|
@ -297,13 +302,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
assert preVersionTypes[requestIndex] != null;
|
||||
}
|
||||
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_bulk");
|
||||
} catch (Throwable e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
processAfter(request, indexShard, location);
|
||||
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
||||
BulkItemRequest[] items = request.items();
|
||||
for (int i = 0; i < items.length; i++) {
|
||||
|
@ -319,26 +318,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
}
|
||||
}
|
||||
|
||||
static class WriteResult {
|
||||
|
||||
final ActionWriteResponse response;
|
||||
|
||||
WriteResult(ActionWriteResponse response) {
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
<T extends ActionWriteResponse> T response() {
|
||||
// this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica
|
||||
// request and not use it
|
||||
response.setShardInfo(new ActionWriteResponse.ShardInfo());
|
||||
return (T) response;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
|
||||
IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {
|
||||
IndexShard indexShard, boolean processed) throws Throwable {
|
||||
|
||||
// validate, if routing is required, that we got routing
|
||||
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type());
|
||||
|
@ -352,11 +333,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
|
||||
}
|
||||
|
||||
final IndexResponse response = executeIndexRequestOnPrimary(request, indexRequest, indexShard);
|
||||
return new WriteResult(response);
|
||||
return executeIndexRequestOnPrimary(request, indexRequest, indexShard);
|
||||
}
|
||||
|
||||
private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
|
||||
private WriteResult<DeleteResponse> shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
|
||||
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY);
|
||||
indexShard.delete(delete);
|
||||
// update the request with the version so it will go to the replicas
|
||||
|
@ -366,7 +346,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
|
||||
|
||||
DeleteResponse deleteResponse = new DeleteResponse(request.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found());
|
||||
return new WriteResult(deleteResponse);
|
||||
return new WriteResult(deleteResponse, delete.getTranslogLocation());
|
||||
}
|
||||
|
||||
static class UpdateResult {
|
||||
|
@ -422,14 +402,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
|
||||
}
|
||||
|
||||
private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard, IndexService indexService) {
|
||||
private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
|
||||
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
|
||||
switch (translate.operation()) {
|
||||
case UPSERT:
|
||||
case INDEX:
|
||||
IndexRequest indexRequest = translate.action();
|
||||
try {
|
||||
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, indexService, false);
|
||||
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false);
|
||||
return new UpdateResult(translate, indexRequest, result);
|
||||
} catch (Throwable t) {
|
||||
t = ExceptionsHelper.unwrapCause(t);
|
||||
|
@ -466,6 +446,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
IndexShard indexShard = indexService.shardSafe(shardId.id());
|
||||
Translog.Location location = null;
|
||||
for (int i = 0; i < request.items().length; i++) {
|
||||
BulkItemRequest item = request.items()[i];
|
||||
if (item == null || item.isIgnoreOnReplica()) {
|
||||
|
@ -491,6 +472,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
|
||||
}
|
||||
operation.execute(indexShard);
|
||||
location = locationToSync(location, operation.getTranslogLocation());
|
||||
} catch (Throwable e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
|
@ -503,6 +485,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
try {
|
||||
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
|
||||
indexShard.delete(delete);
|
||||
location = locationToSync(location, delete.getTranslogLocation());
|
||||
} catch (Throwable e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
// so we will fail the shard
|
||||
|
@ -515,6 +498,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
}
|
||||
}
|
||||
|
||||
processAfter(request, indexShard, location);
|
||||
}
|
||||
|
||||
private void processAfter(BulkShardRequest request, IndexShard indexShard, Translog.Location location) {
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_bulk");
|
||||
|
@ -522,6 +509,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
|
||||
indexShard.sync(location);
|
||||
}
|
||||
}
|
||||
|
||||
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
|
||||
|
@ -535,4 +526,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
// log?
|
||||
}
|
||||
}
|
||||
|
||||
private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
|
||||
/* here we are moving forward in the translog with each operation. Under the hood
|
||||
* this might cross translog files which is ok since from the user perspective
|
||||
* the translog is like a tape where only the highest location needs to be fsynced
|
||||
* in order to sync all previous locations even though they are not in the same file.
|
||||
* When the translog rolls over files the previous file is fsynced on after closing if needed.*/
|
||||
assert next != null : "next operation can't be null";
|
||||
assert current == null || current.compareTo(next) < 0 : "translog locations are not increasing";
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,12 +19,14 @@
|
|||
|
||||
package org.elasticsearch.action.delete;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
|
||||
|
@ -41,11 +43,14 @@ import org.elasticsearch.index.VersionType;
|
|||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Performs the delete operation.
|
||||
*/
|
||||
|
@ -140,13 +145,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
|
|||
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_delete");
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
processAfter(request, indexShard, delete.getTranslogLocation());
|
||||
|
||||
DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found());
|
||||
return new Tuple<>(response, shardRequest.request);
|
||||
|
@ -158,14 +157,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
|
|||
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
|
||||
|
||||
indexShard.delete(delete);
|
||||
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_delete");
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
processAfter(request, indexShard, delete.getTranslogLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,4 +165,18 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
|
|||
return clusterService.operationRouting()
|
||||
.deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
|
||||
}
|
||||
|
||||
private void processAfter(DeleteRequest request, IndexShard indexShard, Translog.Location location) {
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_delete");
|
||||
} catch (Throwable e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
|
||||
indexShard.sync(location);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.index;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.RoutingMissingException;
|
||||
|
@ -45,11 +46,14 @@ import org.elasticsearch.index.mapper.Mapping;
|
|||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Performs the index operation.
|
||||
* <p/>
|
||||
|
@ -166,16 +170,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
|
||||
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
|
||||
|
||||
final IndexResponse response = executeIndexRequestOnPrimary(null, request, indexShard);
|
||||
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_index");
|
||||
} catch (Throwable e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard);
|
||||
final IndexResponse response = result.response;
|
||||
final Translog.Location location = result.location;
|
||||
processAfter(request, indexShard, location);
|
||||
return new Tuple<>(response, shardRequest.request);
|
||||
}
|
||||
|
||||
|
@ -198,12 +196,20 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
|
||||
}
|
||||
operation.execute(indexShard);
|
||||
processAfter(request, indexShard, operation.getTranslogLocation());
|
||||
}
|
||||
|
||||
private void processAfter(IndexRequest request, IndexShard indexShard, Translog.Location location) {
|
||||
if (request.refresh()) {
|
||||
try {
|
||||
indexShard.refresh("refresh_flag_index");
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
|
||||
indexShard.sync(location);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.elasticsearch.index.mapper.SourceToParse;
|
|||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -191,6 +192,26 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
return false;
|
||||
}
|
||||
|
||||
protected static class WriteResult<T extends ActionWriteResponse> {
|
||||
|
||||
public final T response;
|
||||
public final Translog.Location location;
|
||||
|
||||
public WriteResult(T response, Translog.Location location) {
|
||||
this.response = response;
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends ActionWriteResponse> T response() {
|
||||
// this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica
|
||||
// request and not use it
|
||||
response.setShardInfo(new ActionWriteResponse.ShardInfo());
|
||||
return (T) response;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class OperationTransportHandler implements TransportRequestHandler<Request> {
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
|
@ -1048,7 +1069,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
|
||||
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
|
||||
protected final IndexResponse executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
|
||||
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
|
||||
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
final boolean created;
|
||||
|
@ -1087,6 +1108,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
|
||||
return new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created);
|
||||
return new WriteResult(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -605,6 +605,7 @@ public abstract class Engine implements Closeable {
|
|||
private final VersionType versionType;
|
||||
private final Origin origin;
|
||||
private final boolean canHaveDuplicates;
|
||||
private Translog.Location location;
|
||||
|
||||
private final long startTime;
|
||||
private long endTime;
|
||||
|
@ -670,6 +671,14 @@ public abstract class Engine implements Closeable {
|
|||
this.doc.version().setLongValue(version);
|
||||
}
|
||||
|
||||
public void setTranslogLocation(Translog.Location location) {
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
public Translog.Location getTranslogLocation() {
|
||||
return this.location;
|
||||
}
|
||||
|
||||
public VersionType versionType() {
|
||||
return this.versionType;
|
||||
}
|
||||
|
@ -785,6 +794,7 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
private final long startTime;
|
||||
private long endTime;
|
||||
private Translog.Location location;
|
||||
|
||||
public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) {
|
||||
this.type = type;
|
||||
|
@ -864,6 +874,14 @@ public abstract class Engine implements Closeable {
|
|||
public long endTime() {
|
||||
return this.endTime;
|
||||
}
|
||||
|
||||
public void setTranslogLocation(Translog.Location location) {
|
||||
this.location = location;
|
||||
}
|
||||
|
||||
public Translog.Location getTranslogLocation() {
|
||||
return this.location;
|
||||
}
|
||||
}
|
||||
|
||||
public static class DeleteByQuery {
|
||||
|
|
|
@ -398,7 +398,7 @@ public class InternalEngine extends Engine {
|
|||
Translog.Location translogLocation = translog.add(new Translog.Create(create));
|
||||
|
||||
versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
|
||||
|
||||
create.setTranslogLocation(translogLocation);
|
||||
indexingService.postCreateUnderLock(create);
|
||||
}
|
||||
|
||||
|
@ -501,7 +501,7 @@ public class InternalEngine extends Engine {
|
|||
Translog.Location translogLocation = translog.add(new Translog.Index(index));
|
||||
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
|
||||
|
||||
index.setTranslogLocation(translogLocation);
|
||||
indexingService.postIndexUnderLock(index);
|
||||
return created;
|
||||
}
|
||||
|
@ -571,7 +571,7 @@ public class InternalEngine extends Engine {
|
|||
delete.updateVersion(updatedVersion, found);
|
||||
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
||||
versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), translogLocation));
|
||||
|
||||
delete.setTranslogLocation(translogLocation);
|
||||
indexingService.postDeleteUnderLock(delete);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,6 +118,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
|||
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
|
||||
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME);
|
||||
indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH);
|
||||
indexDynamicSettings.addDynamicSetting(Translog.INDEX_TRANSLOG_DURABILITY);
|
||||
indexDynamicSettings.addDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED);
|
||||
indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN);
|
||||
}
|
||||
|
|
|
@ -116,6 +116,8 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -1352,4 +1354,24 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
public int getOperationsCount() {
|
||||
return indexShardOperationCounter.refCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncs the given location with the underlying storage unless already synced.
|
||||
*/
|
||||
public void sync(Translog.Location location) {
|
||||
final Engine engine = engine();
|
||||
try {
|
||||
engine.getTranslog().ensureSynced(location);
|
||||
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
|
||||
logger.debug("failed to sync translog", ex);
|
||||
throw new ElasticsearchException("failed to sync translog", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current translog durability mode
|
||||
*/
|
||||
public Translog.Durabilty getTranslogDurability() {
|
||||
return engine().getTranslog().getDurabilty();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,66 +32,49 @@ import java.nio.ByteBuffer;
|
|||
*/
|
||||
public final class BufferingTranslogFile extends TranslogFile {
|
||||
|
||||
private volatile int operationCounter;
|
||||
private volatile long lastPosition;
|
||||
private volatile long lastWrittenPosition;
|
||||
|
||||
private volatile long lastSyncPosition = 0;
|
||||
|
||||
private byte[] buffer;
|
||||
private int bufferCount;
|
||||
private WrapperOutputStream bufferOs = new WrapperOutputStream();
|
||||
|
||||
/* the total offset of this file including the bytes written to the file as well as into the buffer */
|
||||
private volatile long totalOffset;
|
||||
|
||||
public BufferingTranslogFile(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException {
|
||||
super(shardId, id, channelReference);
|
||||
this.buffer = new byte[bufferSize];
|
||||
final TranslogStream stream = this.channelReference.stream();
|
||||
int headerSize = stream.writeHeader(channelReference.channel());
|
||||
this.lastPosition += headerSize;
|
||||
this.lastWrittenPosition += headerSize;
|
||||
this.lastSyncPosition += headerSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return operationCounter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sizeInBytes() {
|
||||
return lastWrittenPosition;
|
||||
this.totalOffset = writtenOffset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Location add(BytesReference data) throws IOException {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
operationCounter++;
|
||||
long position = lastPosition;
|
||||
final long offset = totalOffset;
|
||||
if (data.length() >= buffer.length) {
|
||||
flushBuffer();
|
||||
flush();
|
||||
// we use the channel to write, since on windows, writing to the RAF might not be reflected
|
||||
// when reading through the channel
|
||||
data.writeTo(channelReference.channel());
|
||||
lastWrittenPosition += data.length();
|
||||
lastPosition += data.length();
|
||||
return new Translog.Location(id, position, data.length());
|
||||
writtenOffset += data.length();
|
||||
totalOffset += data.length();
|
||||
return new Translog.Location(id, offset, data.length());
|
||||
}
|
||||
if (data.length() > buffer.length - bufferCount) {
|
||||
flushBuffer();
|
||||
flush();
|
||||
}
|
||||
data.writeTo(bufferOs);
|
||||
lastPosition += data.length();
|
||||
return new Translog.Location(id, position, data.length());
|
||||
totalOffset += data.length();
|
||||
return new Translog.Location(id, offset, data.length());
|
||||
}
|
||||
}
|
||||
|
||||
private void flushBuffer() throws IOException {
|
||||
protected final void flush() throws IOException {
|
||||
assert writeLock.isHeldByCurrentThread();
|
||||
if (bufferCount > 0) {
|
||||
// we use the channel to write, since on windows, writing to the RAF might not be reflected
|
||||
// when reading through the channel
|
||||
Channels.writeToChannel(buffer, 0, bufferCount, channelReference.channel());
|
||||
lastWrittenPosition += bufferCount;
|
||||
writtenOffset += bufferCount;
|
||||
bufferCount = 0;
|
||||
}
|
||||
}
|
||||
|
@ -99,8 +82,8 @@ public final class BufferingTranslogFile extends TranslogFile {
|
|||
@Override
|
||||
protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
if (position >= lastWrittenPosition) {
|
||||
System.arraycopy(buffer, (int) (position - lastWrittenPosition),
|
||||
if (position >= writtenOffset) {
|
||||
System.arraycopy(buffer, (int) (position - writtenOffset),
|
||||
targetBuffer.array(), targetBuffer.position(), targetBuffer.limit());
|
||||
return;
|
||||
}
|
||||
|
@ -110,26 +93,9 @@ public final class BufferingTranslogFile extends TranslogFile {
|
|||
Channels.readFromFileChannelWithEofException(channelReference.channel(), position, targetBuffer);
|
||||
}
|
||||
|
||||
public ChannelImmutableReader immutableReader() throws TranslogException {
|
||||
if (channelReference.tryIncRef()) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
flushBuffer();
|
||||
ChannelImmutableReader reader = new ChannelImmutableReader(this.id, channelReference, lastWrittenPosition, operationCounter);
|
||||
channelReference.incRef(); // for new reader
|
||||
return reader;
|
||||
} catch (Exception e) {
|
||||
throw new TranslogException(shardId, "exception while creating an immutable reader", e);
|
||||
} finally {
|
||||
channelReference.decRef();
|
||||
}
|
||||
} else {
|
||||
throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncNeeded() {
|
||||
return lastPosition != lastSyncPosition;
|
||||
return totalOffset != lastSyncedOffset;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -138,21 +104,12 @@ public final class BufferingTranslogFile extends TranslogFile {
|
|||
return;
|
||||
}
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
flushBuffer();
|
||||
lastSyncPosition = lastPosition;
|
||||
flush();
|
||||
lastSyncedOffset = totalOffset;
|
||||
}
|
||||
channelReference.channel().force(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws IOException {
|
||||
try {
|
||||
sync();
|
||||
} finally {
|
||||
super.doClose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reuse(TranslogFile other) {
|
||||
if (!(other instanceof BufferingTranslogFile)) {
|
||||
|
@ -160,7 +117,7 @@ public final class BufferingTranslogFile extends TranslogFile {
|
|||
}
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
try {
|
||||
flushBuffer();
|
||||
flush();
|
||||
this.buffer = ((BufferingTranslogFile) other).buffer;
|
||||
} catch (IOException e) {
|
||||
throw new TranslogException(shardId, "failed to flush", e);
|
||||
|
@ -173,7 +130,7 @@ public final class BufferingTranslogFile extends TranslogFile {
|
|||
if (this.buffer.length == bufferSize) {
|
||||
return;
|
||||
}
|
||||
flushBuffer();
|
||||
flush();
|
||||
this.buffer = new byte[bufferSize];
|
||||
} catch (IOException e) {
|
||||
throw new TranslogException(shardId, "failed to flush", e);
|
||||
|
@ -194,5 +151,4 @@ public final class BufferingTranslogFile extends TranslogFile {
|
|||
bufferCount += len;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,124 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public final class SimpleTranslogFile extends TranslogFile {
|
||||
|
||||
private volatile int operationCounter = 0;
|
||||
private volatile long lastPosition = 0;
|
||||
private volatile long lastWrittenPosition = 0;
|
||||
private volatile long lastSyncPosition = 0;
|
||||
|
||||
public SimpleTranslogFile(ShardId shardId, long id, ChannelReference channelReference) throws IOException {
|
||||
super(shardId, id, channelReference);
|
||||
int headerSize = this.channelReference.stream().writeHeader(channelReference.channel());
|
||||
this.lastPosition += headerSize;
|
||||
this.lastWrittenPosition += headerSize;
|
||||
this.lastSyncPosition += headerSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return operationCounter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sizeInBytes() {
|
||||
return lastWrittenPosition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Translog.Location add(BytesReference data) throws IOException {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
long position = lastPosition;
|
||||
data.writeTo(channelReference.channel());
|
||||
lastPosition = lastPosition + data.length();
|
||||
lastWrittenPosition = lastWrittenPosition + data.length();
|
||||
operationCounter = operationCounter + 1;
|
||||
return new Translog.Location(id, position, data.length());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
Channels.readFromFileChannelWithEofException(channelReference.channel(), position, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doClose() throws IOException {
|
||||
try {
|
||||
sync();
|
||||
} finally {
|
||||
super.doClose();
|
||||
}
|
||||
}
|
||||
|
||||
public ChannelImmutableReader immutableReader() throws TranslogException {
|
||||
if (channelReference.tryIncRef()) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
ChannelImmutableReader reader = new ChannelImmutableReader(this.id, channelReference, lastWrittenPosition, operationCounter);
|
||||
channelReference.incRef(); // for the new object
|
||||
return reader;
|
||||
} finally {
|
||||
channelReference.decRef();
|
||||
}
|
||||
} else {
|
||||
throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] channel ref count");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncNeeded() {
|
||||
return lastWrittenPosition != lastSyncPosition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
// check if we really need to sync here...
|
||||
if (!syncNeeded()) {
|
||||
return;
|
||||
}
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
lastSyncPosition = lastWrittenPosition;
|
||||
channelReference.channel().force(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reuse(TranslogFile other) {
|
||||
// nothing to do there
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBufferSize(int bufferSize) throws TranslogException {
|
||||
// nothing to do here...
|
||||
}
|
||||
}
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -73,6 +74,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
public static ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb");
|
||||
public static final String TRANSLOG_ID_KEY = "translog_id";
|
||||
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
|
||||
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
|
||||
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
|
||||
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
|
||||
|
@ -80,6 +82,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
static final Pattern PARSE_ID_PATTERN = Pattern.compile(TRANSLOG_FILE_PREFIX + "(\\d+)(\\.recovering)?$");
|
||||
private final TimeValue syncInterval;
|
||||
private volatile ScheduledFuture<?> syncScheduler;
|
||||
private volatile Durabilty durabilty = Durabilty.REQUEST;
|
||||
|
||||
|
||||
// this is a concurrent set and is not protected by any of the locks. The main reason
|
||||
|
@ -95,6 +98,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
logger.info("updating type from [{}] to [{}]", Translog.this.type, type);
|
||||
Translog.this.type = type;
|
||||
}
|
||||
|
||||
final Durabilty durabilty = Durabilty.getFromSettings(logger, settings, Translog.this.durabilty);
|
||||
if (durabilty != Translog.this.durabilty) {
|
||||
logger.info("updating durability from [{}] to [{}]", Translog.this.durabilty, durabilty);
|
||||
Translog.this.durabilty = durabilty;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,7 +149,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
readLock = new ReleasableLock(rwl.readLock());
|
||||
writeLock = new ReleasableLock(rwl.writeLock());
|
||||
|
||||
this.durabilty = Durabilty.getFromSettings(logger, indexSettings, durabilty);
|
||||
this.indexSettingsService = indexSettingsService;
|
||||
this.bigArrays = bigArrays;
|
||||
this.location = location;
|
||||
|
@ -494,10 +503,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
*/
|
||||
public void sync() throws IOException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
if (closed.get()) {
|
||||
return;
|
||||
if (closed.get() == false) {
|
||||
current.sync();
|
||||
}
|
||||
current.sync();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -512,6 +520,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return TRANSLOG_FILE_PREFIX + translogId;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Ensures that the given location has be synced / written to the underlying storage.
|
||||
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
|
||||
*/
|
||||
public boolean ensureSynced(Location location) throws IOException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
if (location.translogId == current.id) { // if we have a new one it's already synced
|
||||
return current.syncUpTo(location.translogLocation + location.size);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* return stats
|
||||
*/
|
||||
|
@ -547,6 +569,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* a view into the translog, capturing all translog file at the moment of creation
|
||||
* and updated with any future translog.
|
||||
|
@ -676,7 +699,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
public static class Location implements Accountable {
|
||||
public static class Location implements Accountable, Comparable<Location> {
|
||||
|
||||
public final long translogId;
|
||||
public final long translogLocation;
|
||||
|
@ -702,6 +725,35 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
public String toString() {
|
||||
return "[id: " + translogId + ", location: " + translogLocation + ", size: " + size + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Location o) {
|
||||
if (translogId == o.translogId) {
|
||||
return Long.compare(translogLocation, o.translogLocation);
|
||||
}
|
||||
return Long.compare(translogId, o.translogId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Location location = (Location) o;
|
||||
|
||||
if (translogId != location.translogId) return false;
|
||||
if (translogLocation != location.translogLocation) return false;
|
||||
return size == location.size;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = (int) (translogId ^ (translogId >>> 32));
|
||||
result = 31 * result + (int) (translogLocation ^ (translogLocation >>> 32));
|
||||
result = 31 * result + size;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1430,4 +1482,32 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current durability mode of this translog.
|
||||
*/
|
||||
public Durabilty getDurabilty() {
|
||||
return durabilty;
|
||||
}
|
||||
|
||||
public enum Durabilty {
|
||||
/**
|
||||
* Async durability - translogs are synced based on a time interval.
|
||||
*/
|
||||
ASYNC,
|
||||
/**
|
||||
* Request durability - translogs are synced for each high levle request (bulk, index, delete)
|
||||
*/
|
||||
REQUEST;
|
||||
|
||||
public static Durabilty getFromSettings(ESLogger logger, Settings settings, Durabilty defaultValue) {
|
||||
final String value = settings.get(INDEX_TRANSLOG_DURABILITY, defaultValue.name());
|
||||
try {
|
||||
return valueOf(value.toUpperCase(Locale.ROOT));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
logger.warn("Can't apply {} illegal value: {} using {} instead, use one of: {}", INDEX_TRANSLOG_DURABILITY, value, defaultValue, Arrays.toString(values()));
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.translog;
|
|||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
|
@ -29,18 +30,28 @@ import java.nio.ByteBuffer;
|
|||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public abstract class TranslogFile extends ChannelReader {
|
||||
public class TranslogFile extends ChannelReader {
|
||||
|
||||
protected final ShardId shardId;
|
||||
protected final ReleasableLock readLock;
|
||||
protected final ReleasableLock writeLock;
|
||||
/* the offset in bytes that was written when the file was last synced*/
|
||||
protected volatile long lastSyncedOffset;
|
||||
/* the number of translog operations written to this file */
|
||||
protected volatile int operationCounter;
|
||||
/* the offset in bytes written to the file */
|
||||
protected volatile long writtenOffset;
|
||||
|
||||
public TranslogFile(ShardId shardId, long id, ChannelReference channelReference) {
|
||||
public TranslogFile(ShardId shardId, long id, ChannelReference channelReference) throws IOException {
|
||||
super(id, channelReference);
|
||||
this.shardId = shardId;
|
||||
ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
readLock = new ReleasableLock(rwl.readLock());
|
||||
writeLock = new ReleasableLock(rwl.writeLock());
|
||||
final TranslogStream stream = this.channelReference.stream();
|
||||
int headerSize = stream.writeHeader(channelReference.channel());
|
||||
this.writtenOffset += headerSize;
|
||||
this.lastSyncedOffset += headerSize;
|
||||
}
|
||||
|
||||
|
||||
|
@ -49,7 +60,7 @@ public abstract class TranslogFile extends ChannelReader {
|
|||
SIMPLE() {
|
||||
@Override
|
||||
public TranslogFile create(ShardId shardId, long id, ChannelReference channelReference, int bufferSize) throws IOException {
|
||||
return new SimpleTranslogFile(shardId, id, channelReference);
|
||||
return new TranslogFile(shardId, id, channelReference);
|
||||
}
|
||||
},
|
||||
BUFFERED() {
|
||||
|
@ -73,25 +84,58 @@ public abstract class TranslogFile extends ChannelReader {
|
|||
|
||||
|
||||
/** add the given bytes to the translog and return the location they were written at */
|
||||
public abstract Translog.Location add(BytesReference data) throws IOException;
|
||||
public Translog.Location add(BytesReference data) throws IOException {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
long position = writtenOffset;
|
||||
data.writeTo(channelReference.channel());
|
||||
writtenOffset = writtenOffset + data.length();
|
||||
operationCounter = operationCounter + 1;
|
||||
return new Translog.Location(id, position, data.length());
|
||||
}
|
||||
}
|
||||
|
||||
/** reuse resources from another translog file, which is guaranteed not to be used anymore */
|
||||
public abstract void reuse(TranslogFile other) throws TranslogException;
|
||||
public void reuse(TranslogFile other) throws TranslogException {}
|
||||
|
||||
/** change the size of the internal buffer if relevant */
|
||||
public abstract void updateBufferSize(int bufferSize) throws TranslogException;
|
||||
public void updateBufferSize(int bufferSize) throws TranslogException {}
|
||||
|
||||
/** write all buffered ops to disk and fsync file */
|
||||
public abstract void sync() throws IOException;
|
||||
public void sync() throws IOException {
|
||||
// check if we really need to sync here...
|
||||
if (syncNeeded()) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
lastSyncedOffset = writtenOffset;
|
||||
channelReference.channel().force(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** returns true if there are buffered ops */
|
||||
public abstract boolean syncNeeded();
|
||||
public boolean syncNeeded() {
|
||||
return writtenOffset != lastSyncedOffset; // by default nothing is buffered
|
||||
}
|
||||
|
||||
@Override
|
||||
public int totalOperations() {
|
||||
return operationCounter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sizeInBytes() {
|
||||
return writtenOffset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelSnapshot newSnapshot() {
|
||||
return new ChannelSnapshot(immutableReader());
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes the buffer if the translog is buffered.
|
||||
*/
|
||||
protected void flush() throws IOException {}
|
||||
|
||||
/**
|
||||
* returns a new reader that follows the current writes (most importantly allows making
|
||||
* repeated snapshots that includes new content)
|
||||
|
@ -112,7 +156,22 @@ public abstract class TranslogFile extends ChannelReader {
|
|||
|
||||
|
||||
/** returns a new immutable reader which only exposes the current written operation * */
|
||||
abstract public ChannelImmutableReader immutableReader();
|
||||
public ChannelImmutableReader immutableReader() throws TranslogException {
|
||||
if (channelReference.tryIncRef()) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
flush();
|
||||
ChannelImmutableReader reader = new ChannelImmutableReader(this.id, channelReference, writtenOffset, operationCounter);
|
||||
channelReference.incRef(); // for new reader
|
||||
return reader;
|
||||
} catch (Exception e) {
|
||||
throw new TranslogException(shardId, "exception while creating an immutable reader", e);
|
||||
} finally {
|
||||
channelReference.decRef();
|
||||
}
|
||||
} else {
|
||||
throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count");
|
||||
}
|
||||
}
|
||||
|
||||
boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(location.size);
|
||||
|
@ -150,4 +209,32 @@ public abstract class TranslogFile extends ChannelReader {
|
|||
return TranslogFile.this.newSnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncs the translog up to at least the given offset unless already synced
|
||||
* @return <code>true</code> if this call caused an actual sync operation
|
||||
*/
|
||||
public boolean syncUpTo(long offset) throws IOException {
|
||||
if (lastSyncedOffset < offset) {
|
||||
sync();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void doClose() throws IOException {
|
||||
try {
|
||||
sync();
|
||||
} finally {
|
||||
super.doClose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
Channels.readFromFileChannelWithEofException(channelReference.channel(), position, buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.translog;
|
|||
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -33,6 +34,8 @@ import org.elasticsearch.index.shard.*;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
@ -53,7 +56,6 @@ public class TranslogService extends AbstractIndexShardComponent implements Clos
|
|||
private final ThreadPool threadPool;
|
||||
private final IndexSettingsService indexSettingsService;
|
||||
private final IndexShard indexShard;
|
||||
private volatile Translog translog;
|
||||
|
||||
private volatile TimeValue interval;
|
||||
private volatile int flushThresholdOperations;
|
||||
|
@ -75,7 +77,6 @@ public class TranslogService extends AbstractIndexShardComponent implements Clos
|
|||
this.flushThresholdPeriod = indexSettings.getAsTime(INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, TimeValue.timeValueMinutes(30));
|
||||
this.interval = indexSettings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, timeValueMillis(5000));
|
||||
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
|
||||
|
||||
logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
|
||||
|
||||
this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());
|
||||
|
@ -141,12 +142,11 @@ public class TranslogService extends AbstractIndexShardComponent implements Clos
|
|||
reschedule();
|
||||
return;
|
||||
}
|
||||
|
||||
if (indexShard.engine().getTranslog() == null) {
|
||||
Translog translog = indexShard.engine().getTranslog();
|
||||
if (translog == null) {
|
||||
reschedule();
|
||||
return;
|
||||
}
|
||||
|
||||
int currentNumberOfOperations = translog.totalOperations();
|
||||
if (currentNumberOfOperations == 0) {
|
||||
reschedule();
|
||||
|
|
|
@ -344,7 +344,7 @@ public class BulkTests extends ElasticsearchIntegrationTest {
|
|||
);
|
||||
}
|
||||
response = builder.execute().actionGet();
|
||||
assertThat(response.hasFailures(), equalTo(false));
|
||||
assertThat(response.buildFailureMessage(), response.hasFailures(), equalTo(false));
|
||||
assertThat(response.getItems().length, equalTo(numDocs));
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
assertThat(response.getItems()[i].getItemId(), equalTo(i));
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.logging.ESLogger;
|
|||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
|
||||
import org.junit.Test;
|
||||
|
@ -39,6 +40,7 @@ import java.util.concurrent.ExecutionException;
|
|||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
|
@ -237,4 +239,42 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
Path... shardPaths) throws IOException {
|
||||
ShardStateMetaData.FORMAT.write(shardStateMetaData, shardStateMetaData.version, shardPaths);
|
||||
}
|
||||
|
||||
public void testDurableFlagHasEffect() {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
client().prepareIndex("test", "bar", "1").setSource("{}").get();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard = test.shard(0);
|
||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
||||
assertFalse(shard.engine().getTranslog().syncNeeded());
|
||||
setDurability(shard, Translog.Durabilty.ASYNC);
|
||||
client().prepareIndex("test", "bar", "2").setSource("{}").get();
|
||||
assertTrue(shard.engine().getTranslog().syncNeeded());
|
||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
||||
client().prepareDelete("test", "bar", "1").get();
|
||||
assertFalse(shard.engine().getTranslog().syncNeeded());
|
||||
|
||||
setDurability(shard, Translog.Durabilty.ASYNC);
|
||||
client().prepareDelete("test", "bar", "2").get();
|
||||
assertTrue(shard.engine().getTranslog().syncNeeded());
|
||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
||||
assertNoFailures(client().prepareBulk()
|
||||
.add(client().prepareIndex("test", "bar", "3").setSource("{}"))
|
||||
.add(client().prepareDelete("test", "bar", "1")).get());
|
||||
assertFalse(shard.engine().getTranslog().syncNeeded());
|
||||
|
||||
setDurability(shard, Translog.Durabilty.ASYNC);
|
||||
assertNoFailures(client().prepareBulk()
|
||||
.add(client().prepareIndex("test", "bar", "4").setSource("{}"))
|
||||
.add(client().prepareDelete("test", "bar", "3")).get());
|
||||
setDurability(shard, Translog.Durabilty.REQUEST);
|
||||
assertTrue(shard.engine().getTranslog().syncNeeded());
|
||||
}
|
||||
|
||||
private void setDurability(IndexShard shard, Translog.Durabilty durabilty) {
|
||||
client().admin().indices().prepareUpdateSettings(shard.shardId.getIndex()).setSettings(settingsBuilder().put(Translog.INDEX_TRANSLOG_DURABILITY, durabilty.name()).build()).get();
|
||||
assertEquals(durabilty, shard.getTranslogDurability());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,8 +28,7 @@ import java.io.IOException;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
@TestLogging("index.translog.fs:TRACE")
|
||||
public class FsBufferedTranslogTests extends AbstractTranslogTests {
|
||||
public class BufferedTranslogTests extends TranslogTests {
|
||||
|
||||
@Override
|
||||
protected Translog create() throws IOException {
|
|
@ -21,15 +21,13 @@ package org.elasticsearch.index.translog;
|
|||
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@TestLogging("index.translog.fs:TRACE")
|
||||
public class FsSimpleTranslogTests extends AbstractTranslogTests {
|
||||
public class FsSimpleTranslogTests extends TranslogTests {
|
||||
|
||||
@Override
|
||||
protected Translog create() throws IOException {
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.elasticsearch.common.bytes.BytesArray;
|
|||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -42,6 +44,7 @@ import java.io.EOFException;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
@ -59,7 +62,7 @@ import static org.hamcrest.Matchers.*;
|
|||
*
|
||||
*/
|
||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||
public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
|
||||
public class TranslogTests extends ElasticsearchTestCase {
|
||||
|
||||
protected final ShardId shardId = new ShardId(new Index("index"), 1);
|
||||
|
||||
|
@ -101,9 +104,11 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
protected abstract Translog create() throws IOException;
|
||||
|
||||
protected Translog create() throws IOException {
|
||||
return new Translog(shardId,
|
||||
ImmutableSettings.settingsBuilder().put("index.translog.fs.type", TranslogFile.Type.SIMPLE.name()).build(),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, translogDir);
|
||||
}
|
||||
|
||||
protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) {
|
||||
list.add(op);
|
||||
|
@ -770,4 +775,59 @@ public abstract class AbstractTranslogTests extends ElasticsearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public void testSyncUpTo() throws IOException {
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
int count = 0;
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
final Translog.Location location = translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
|
||||
if (randomBoolean()) {
|
||||
assertTrue("at least one operation pending", translog.syncNeeded());
|
||||
assertTrue("this operation has not been synced", translog.ensureSynced(location));
|
||||
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
|
||||
translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
|
||||
assertTrue("one pending operation", translog.syncNeeded());
|
||||
assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
|
||||
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
|
||||
}
|
||||
if (rarely()) {
|
||||
translog.newTranslog();
|
||||
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
|
||||
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
translog.sync();
|
||||
assertFalse("translog has been synced already", translog.ensureSynced(location));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testLocationComparison() throws IOException {
|
||||
List<Translog.Location> locations = newArrayList();
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
int count = 0;
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
|
||||
if (rarely()) {
|
||||
translog.newTranslog();
|
||||
}
|
||||
}
|
||||
Collections.shuffle(locations, random());
|
||||
Translog.Location max = locations.get(0);
|
||||
for (Translog.Location location : locations) {
|
||||
max = max(max, location);
|
||||
}
|
||||
|
||||
assertEquals(max.translogId, translog.currentId());
|
||||
final Translog.Operation read = translog.read(max);
|
||||
assertEquals(read.getSource().source.toUtf8(), Integer.toString(count));
|
||||
}
|
||||
|
||||
public static Translog.Location max(Translog.Location a, Translog.Location b) {
|
||||
if (a.compareTo(b) > 0) {
|
||||
return a;
|
||||
}
|
||||
return b;
|
||||
}
|
||||
}
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.indices.memory.breaker;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
|
|
|
@ -99,7 +99,6 @@ import org.elasticsearch.index.mapper.DocumentMapper;
|
|||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.FieldMapper.Loading;
|
||||
import org.elasticsearch.index.mapper.internal.SizeFieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
||||
import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
|
||||
|
@ -586,6 +585,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
if (random.nextBoolean()) {
|
||||
builder.put(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH, random.nextBoolean());
|
||||
}
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(Translog.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durabilty.values()));
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
@ -1730,15 +1732,12 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
}
|
||||
|
||||
final String nodePrefix;
|
||||
final LifecycleScope nodeDirScope;
|
||||
switch (scope) {
|
||||
case TEST:
|
||||
nodePrefix = TEST_CLUSTER_NODE_PREFIX;
|
||||
nodeDirScope = LifecycleScope.TEST;
|
||||
break;
|
||||
case SUITE:
|
||||
nodePrefix = SUITE_CLUSTER_NODE_PREFIX;
|
||||
nodeDirScope = LifecycleScope.SUITE;
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchException("Scope not supported: " + scope);
|
||||
|
|
Loading…
Reference in New Issue