[CCR] Improve error when operations are missing (#35179)
Improve error when operations are missing
This commit is contained in:
parent
cac67f8bcc
commit
46c238d792
|
@ -6,6 +6,8 @@
|
|||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
|
@ -23,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardNotStartedException;
|
||||
|
@ -30,6 +33,7 @@ import org.elasticsearch.index.shard.IndexShardState;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -360,6 +364,21 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
ActionListener<Response> wrappedListener = ActionListener.wrap(listener::onResponse, e -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e);
|
||||
if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) {
|
||||
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
|
||||
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
|
||||
listener.onFailure(new ElasticsearchException(message, e));
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
super.doExecute(task, request, wrappedListener);
|
||||
}
|
||||
|
||||
private void globalCheckpointAdvanced(
|
||||
final ShardId shardId,
|
||||
final long globalCheckpoint,
|
||||
|
|
|
@ -5,6 +5,9 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -84,4 +87,33 @@ public class ShardChangesTests extends ESSingleNodeTestCase {
|
|||
assertThat(operation.id(), equalTo("5"));
|
||||
}
|
||||
|
||||
public void testMissingOperations() {
|
||||
client().admin().indices().prepareCreate("index")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.soft_deletes.retention.operations", 0)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0))
|
||||
.get();
|
||||
|
||||
for (int i = 0; i < 16; i++) {
|
||||
client().prepareIndex("index", "_doc", "1").setSource("{}", XContentType.JSON).get();
|
||||
client().prepareDelete("index", "_doc", "1").get();
|
||||
}
|
||||
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
|
||||
ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index");
|
||||
forceMergeRequest.maxNumSegments(1);
|
||||
client().admin().indices().forceMerge(forceMergeRequest).actionGet();
|
||||
|
||||
ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
|
||||
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
|
||||
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
|
||||
request.setFromSeqNo(0L);
|
||||
request.setMaxOperationCount(1);
|
||||
|
||||
Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
|
||||
"[index.soft_deletes.retention.operations]?"));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue