Bump com.diffplug.spotless from 5.6.1 to 6.2.0 (#1919)
* Bump com.diffplug.spotless from 5.6.1 to 6.2.0 Bumps com.diffplug.spotless from 5.6.1 to 6.2.0. --- updated-dependencies: - dependency-name: com.diffplug.spotless dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * spotlessApply Signed-off-by: Nicholas Walter Knize <nknize@apache.org> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
parent
d47725d9c3
commit
9689a27b63
|
@ -48,7 +48,7 @@ plugins {
|
|||
id 'lifecycle-base'
|
||||
id 'opensearch.docker-support'
|
||||
id 'opensearch.global-build-info'
|
||||
id "com.diffplug.spotless" version "5.6.1" apply false
|
||||
id "com.diffplug.spotless" version "6.2.0" apply false
|
||||
}
|
||||
|
||||
apply from: 'gradle/build-complete.gradle'
|
||||
|
|
|
@ -176,14 +176,10 @@ public class OpenSearchJavaPlugin implements Plugin<Project> {
|
|||
compileOptions.getRelease().set(releaseVersionProviderFromCompileTask(project, compileTask));
|
||||
});
|
||||
// also apply release flag to groovy, which is used in build-tools
|
||||
project.getTasks()
|
||||
.withType(GroovyCompile.class)
|
||||
.configureEach(
|
||||
compileTask -> {
|
||||
project.getTasks().withType(GroovyCompile.class).configureEach(compileTask -> {
|
||||
// TODO: this probably shouldn't apply to groovy at all?
|
||||
compileTask.getOptions().getRelease().set(releaseVersionProviderFromCompileTask(project, compileTask));
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -205,10 +201,7 @@ public class OpenSearchJavaPlugin implements Plugin<Project> {
|
|||
* Adds additional manifest info to jars
|
||||
*/
|
||||
static void configureJars(Project project) {
|
||||
project.getTasks()
|
||||
.withType(Jar.class)
|
||||
.configureEach(
|
||||
jarTask -> {
|
||||
project.getTasks().withType(Jar.class).configureEach(jarTask -> {
|
||||
// we put all our distributable files under distributions
|
||||
jarTask.getDestinationDirectory().set(new File(project.getBuildDir(), "distributions"));
|
||||
// fixup the jar manifest
|
||||
|
@ -221,22 +214,13 @@ public class OpenSearchJavaPlugin implements Plugin<Project> {
|
|||
// after the doFirst added by the info plugin, and we can override attributes
|
||||
jarTask.getManifest()
|
||||
.attributes(
|
||||
Map.of(
|
||||
"Build-Date",
|
||||
BuildParams.getBuildDate(),
|
||||
"Build-Java-Version",
|
||||
BuildParams.getGradleJavaVersion()
|
||||
)
|
||||
Map.of("Build-Date", BuildParams.getBuildDate(), "Build-Java-Version", BuildParams.getGradleJavaVersion())
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
project.getPluginManager().withPlugin("com.github.johnrengelman.shadow", p -> {
|
||||
project.getTasks()
|
||||
.withType(ShadowJar.class)
|
||||
.configureEach(
|
||||
shadowJar -> {
|
||||
project.getTasks().withType(ShadowJar.class).configureEach(shadowJar -> {
|
||||
/*
|
||||
* Replace the default "-all" classifier with null
|
||||
* which will leave the classifier off of the file name.
|
||||
|
@ -247,8 +231,7 @@ public class OpenSearchJavaPlugin implements Plugin<Project> {
|
|||
* better to be safe
|
||||
*/
|
||||
shadowJar.mergeServiceFiles();
|
||||
}
|
||||
);
|
||||
});
|
||||
// Add "original" classifier to the non-shadowed JAR to distinguish it from the shadow JAR
|
||||
project.getTasks().named(JavaPlugin.JAR_TASK_NAME, Jar.class).configure(jar -> jar.getArchiveClassifier().set("original"));
|
||||
// Make sure we assemble the shadow jar
|
||||
|
|
|
@ -53,16 +53,12 @@ public abstract class PrecommitPlugin implements Plugin<Project> {
|
|||
TaskProvider<Task> precommit = project.getTasks().named(PRECOMMIT_TASK_NAME);
|
||||
precommit.configure(t -> t.dependsOn(task));
|
||||
|
||||
project.getPluginManager()
|
||||
.withPlugin(
|
||||
"java",
|
||||
p -> {
|
||||
project.getPluginManager().withPlugin("java", p -> {
|
||||
// We want to get any compilation error before running the pre-commit checks.
|
||||
for (SourceSet sourceSet : GradleUtils.getJavaSourceSets(project)) {
|
||||
task.configure(t -> t.shouldRunAfter(sourceSet.getClassesTaskName()));
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
public abstract TaskProvider<? extends Task> createTask(Project project);
|
||||
|
|
|
@ -56,10 +56,7 @@ public class PrecommitTaskPlugin implements Plugin<Project> {
|
|||
"lifecycle-base",
|
||||
p -> project.getTasks().named(LifecycleBasePlugin.CHECK_TASK_NAME).configure(t -> t.dependsOn(precommit))
|
||||
);
|
||||
project.getPluginManager()
|
||||
.withPlugin(
|
||||
"java",
|
||||
p -> {
|
||||
project.getPluginManager().withPlugin("java", p -> {
|
||||
// run compilation as part of precommit
|
||||
for (SourceSet sourceSet : GradleUtils.getJavaSourceSets(project)) {
|
||||
precommit.configure(t -> t.dependsOn(sourceSet.getClassesTaskName()));
|
||||
|
@ -67,7 +64,6 @@ public class PrecommitTaskPlugin implements Plugin<Project> {
|
|||
|
||||
// make sure tests run after all precommit tasks
|
||||
project.getTasks().withType(Test.class).configureEach(t -> t.mustRunAfter(precommit));
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -160,9 +160,8 @@ public class Sniffer implements Closeable {
|
|||
// tasks are run by a single threaded executor, so swapping is safe with a simple volatile variable
|
||||
ScheduledTask previousTask = nextScheduledTask;
|
||||
nextScheduledTask = new ScheduledTask(task, future);
|
||||
assert initialized.get() == false
|
||||
|| previousTask.task.isSkipped()
|
||||
|| previousTask.task.hasStarted() : "task that we are replacing is neither " + "cancelled nor has it ever started";
|
||||
assert initialized.get() == false || previousTask.task.isSkipped() || previousTask.task.hasStarted()
|
||||
: "task that we are replacing is neither " + "cancelled nor has it ever started";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -206,13 +206,10 @@ public class WhenThingsGoWrongTests extends ScriptTestCase {
|
|||
* the parser with right-curly brackets to allow statements to be delimited by them at the end of blocks.
|
||||
*/
|
||||
public void testRCurlyNotDelim() {
|
||||
IllegalArgumentException e = expectScriptThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> {
|
||||
IllegalArgumentException e = expectScriptThrows(IllegalArgumentException.class, () -> {
|
||||
// We don't want PICKY here so we get the normal error message
|
||||
exec("def i = 1} return 1", emptyMap(), emptyMap(), false);
|
||||
}
|
||||
);
|
||||
});
|
||||
assertEquals("unexpected token ['}'] was expecting one of [{<EOF>, ';'}].", e.getMessage());
|
||||
}
|
||||
|
||||
|
|
|
@ -413,8 +413,8 @@ public class AzureBlobStore implements BlobStore {
|
|||
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws URISyntaxException,
|
||||
BlobStorageException, IOException {
|
||||
assert inputStream
|
||||
.markSupported() : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
|
||||
assert inputStream.markSupported()
|
||||
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
||||
final Tuple<BlobServiceClient, Supplier<Context>> client = client();
|
||||
final BlobContainerClient blobContainer = client.v1().getBlobContainerClient(container);
|
||||
|
|
|
@ -199,8 +199,8 @@ public class HttpReadWriteHandler implements NioChannelHandler {
|
|||
+ ". Found type: "
|
||||
+ message.getClass()
|
||||
+ ".";
|
||||
assert ((HttpPipelinedResponse) message)
|
||||
.getDelegateRequest() instanceof NioHttpResponse : "This channel only pipelined responses with a delegate of type: "
|
||||
assert ((HttpPipelinedResponse) message).getDelegateRequest() instanceof NioHttpResponse
|
||||
: "This channel only pipelined responses with a delegate of type: "
|
||||
+ NioHttpResponse.class
|
||||
+ ". Found type: "
|
||||
+ ((HttpPipelinedResponse) message).getDelegateRequest().getClass()
|
||||
|
|
|
@ -75,16 +75,14 @@ public class Cleanup {
|
|||
sh.runIgnoreExitCode("ps aux | grep -i 'org.opensearch.bootstrap.OpenSearch' | awk {'print $2'} | xargs kill -9");
|
||||
});
|
||||
|
||||
Platforms.onWindows(
|
||||
() -> {
|
||||
Platforms.onWindows(() -> {
|
||||
// the view of processes returned by Get-Process doesn't expose command line arguments, so we use WMI here
|
||||
sh.runIgnoreExitCode(
|
||||
"Get-WmiObject Win32_Process | "
|
||||
+ "Where-Object { $_.CommandLine -Match 'org.opensearch.bootstrap.OpenSearch' } | "
|
||||
+ "ForEach-Object { $_.Terminate() }"
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
Platforms.onLinux(Cleanup::purgePackagesLinux);
|
||||
|
||||
|
|
|
@ -346,22 +346,16 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase {
|
|||
)
|
||||
);
|
||||
}
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// check all retention leases have been synced to all replicas
|
||||
for (final ShardRouting replicaShard : clusterService().state()
|
||||
.routingTable()
|
||||
.index("index")
|
||||
.shard(0)
|
||||
.replicaShards()) {
|
||||
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
|
||||
final String replicaShardNodeId = replicaShard.currentNodeId();
|
||||
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
|
||||
final IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaShardNodeName)
|
||||
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
|
||||
assertThat(replica.getRetentionLeases(), equalTo(primary.getRetentionLeases()));
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -84,21 +84,13 @@ public class PersistentTasksExecutorFullRestartIT extends OpenSearchIntegTestCas
|
|||
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
|
||||
|
||||
// Make sure that at least one of the tasks is running
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(
|
||||
client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get()
|
||||
.getTasks()
|
||||
.size(),
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().size(),
|
||||
greaterThan(0)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
// Restart cluster
|
||||
internalCluster().fullRestart();
|
||||
|
@ -113,21 +105,13 @@ public class PersistentTasksExecutorFullRestartIT extends OpenSearchIntegTestCas
|
|||
}
|
||||
|
||||
logger.info("Waiting for {} tasks to start", numberOfTasks);
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for all tasks to start
|
||||
assertThat(
|
||||
client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get()
|
||||
.getTasks()
|
||||
.size(),
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().size(),
|
||||
equalTo(numberOfTasks)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
logger.info("Complete all tasks");
|
||||
// Complete the running task and make sure it finishes properly
|
||||
|
@ -136,8 +120,7 @@ public class PersistentTasksExecutorFullRestartIT extends OpenSearchIntegTestCas
|
|||
equalTo(numberOfTasks)
|
||||
);
|
||||
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Make sure the task is removed from the cluster state
|
||||
assertThat(
|
||||
((PersistentTasksCustomMetadata) internalCluster().clusterService()
|
||||
|
@ -146,8 +129,7 @@ public class PersistentTasksExecutorFullRestartIT extends OpenSearchIntegTestCas
|
|||
.custom(PersistentTasksCustomMetadata.TYPE)).tasks(),
|
||||
empty()
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,21 +95,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
|
||||
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
|
||||
long allocationId = future.get().getAllocationId();
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(
|
||||
client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get()
|
||||
.getTasks()
|
||||
.size(),
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().size(),
|
||||
equalTo(1)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
TaskInfo firstRunningTask = client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
|
@ -130,15 +122,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
);
|
||||
|
||||
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to disappear completely
|
||||
assertThat(
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(),
|
||||
empty()
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
public void testPersistentActionCompletion() throws Exception {
|
||||
|
@ -147,21 +137,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
String taskId = UUIDs.base64UUID();
|
||||
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
|
||||
long allocationId = future.get().getAllocationId();
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(
|
||||
client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get()
|
||||
.getTasks()
|
||||
.size(),
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().size(),
|
||||
equalTo(1)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
TaskInfo firstRunningTask = client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
|
@ -225,15 +207,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
|
||||
internalCluster().stopRandomNode(settings -> "test".equals(settings.get("node.attr.test_attr")));
|
||||
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to disappear completely
|
||||
assertThat(
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(),
|
||||
empty()
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
// Remove the persistent task
|
||||
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
|
||||
|
@ -368,21 +348,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2);
|
||||
assertFutureThrows(future2, ResourceAlreadyExistsException.class);
|
||||
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(
|
||||
client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get()
|
||||
.getTasks()
|
||||
.size(),
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().size(),
|
||||
equalTo(1)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
TaskInfo firstRunningTask = client().admin()
|
||||
.cluster()
|
||||
|
@ -400,15 +372,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
);
|
||||
|
||||
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to disappear completely
|
||||
assertThat(
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(),
|
||||
empty()
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
public void testUnassignRunningPersistentTask() throws Exception {
|
||||
|
@ -489,21 +459,13 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
|
|||
}
|
||||
|
||||
private static void waitForTaskToStart() throws Exception {
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to start
|
||||
assertThat(
|
||||
client().admin()
|
||||
.cluster()
|
||||
.prepareListTasks()
|
||||
.setActions(TestPersistentTasksExecutor.NAME + "[c]")
|
||||
.get()
|
||||
.getTasks()
|
||||
.size(),
|
||||
client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks().size(),
|
||||
equalTo(1)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private static void assertClusterStateHasTask(String taskId) {
|
||||
|
|
|
@ -810,8 +810,8 @@ public class RelocationIT extends OpenSearchIntegTestCase {
|
|||
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
|
||||
// corrupting the segments_N files in order to make sure future recovery re-send files
|
||||
logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name());
|
||||
assert chunkRequest.content().toBytesRef().bytes == chunkRequest.content()
|
||||
.toBytesRef().bytes : "no internal reference!!";
|
||||
assert chunkRequest.content().toBytesRef().bytes == chunkRequest.content().toBytesRef().bytes
|
||||
: "no internal reference!!";
|
||||
byte[] array = chunkRequest.content().toBytesRef().bytes;
|
||||
array[0] = (byte) ~array[0]; // flip one byte in the content
|
||||
corruptionCount.countDown();
|
||||
|
|
|
@ -297,9 +297,7 @@ public final class ExceptionsHelper {
|
|||
* @param throwable the throwable to possibly throw on another thread
|
||||
*/
|
||||
public static void maybeDieOnAnotherThread(final Throwable throwable) {
|
||||
ExceptionsHelper.maybeError(throwable)
|
||||
.ifPresent(
|
||||
error -> {
|
||||
ExceptionsHelper.maybeError(throwable).ifPresent(error -> {
|
||||
/*
|
||||
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, sometimes the stack
|
||||
* contains statements that catch any throwable (e.g., Netty, and the JDK futures framework). This means that a rethrow here
|
||||
|
@ -314,8 +312,7 @@ public final class ExceptionsHelper {
|
|||
} finally {
|
||||
new Thread(() -> { throw error; }).start();
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -364,9 +364,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
|||
}
|
||||
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
|
||||
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
|
||||
assert waitForActiveShards.equals(
|
||||
ActiveShardCount.DEFAULT
|
||||
) == false : "waitForActiveShards must not be DEFAULT on the request object, instead it should be NONE";
|
||||
assert waitForActiveShards.equals(ActiveShardCount.DEFAULT) == false
|
||||
: "waitForActiveShards must not be DEFAULT on the request object, instead it should be NONE";
|
||||
if (waitForActiveShards.equals(ActiveShardCount.ALL)) {
|
||||
if (response.getUnassignedShards() == 0 && response.getInitializingShards() == 0) {
|
||||
// if we are waiting for all shards to be active, then the num of unassigned and num of initializing shards must be 0
|
||||
|
|
|
@ -187,12 +187,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
|
|||
TaskInfo snapshotOfRunningTask,
|
||||
ActionListener<GetTaskResponse> listener
|
||||
) {
|
||||
getFinishedTaskFromIndex(
|
||||
thisTask,
|
||||
request,
|
||||
ActionListener.delegateResponse(
|
||||
listener,
|
||||
(delegatedListener, e) -> {
|
||||
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> {
|
||||
/*
|
||||
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
|
||||
* the error isn't a 404 then we'll just throw it back to the user.
|
||||
|
@ -202,9 +197,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
|
|||
} else {
|
||||
delegatedListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -363,10 +363,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
}
|
||||
final long startTime = snapshotInfo.startTime();
|
||||
final long endTime = snapshotInfo.endTime();
|
||||
assert endTime >= startTime
|
||||
|| (endTime == 0L && snapshotInfo.state().completed() == false) : "Inconsistent timestamps found in SnapshotInfo ["
|
||||
+ snapshotInfo
|
||||
+ "]";
|
||||
assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false)
|
||||
: "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]";
|
||||
builder.add(
|
||||
new SnapshotStatus(
|
||||
new Snapshot(repositoryName, snapshotId),
|
||||
|
|
|
@ -109,8 +109,8 @@ class BulkPrimaryExecutionContext {
|
|||
|
||||
/** move to the next item to execute */
|
||||
private void advance() {
|
||||
assert currentItemState == ItemProcessingState.COMPLETED
|
||||
|| currentIndex == -1 : "moving to next but current item wasn't completed (state: " + currentItemState + ")";
|
||||
assert currentItemState == ItemProcessingState.COMPLETED || currentIndex == -1
|
||||
: "moving to next but current item wasn't completed (state: " + currentItemState + ")";
|
||||
currentItemState = ItemProcessingState.INITIAL;
|
||||
currentIndex = findNextNonAborted(currentIndex + 1);
|
||||
retryCounter = 0;
|
||||
|
|
|
@ -878,8 +878,8 @@ public abstract class TransportReplicationAction<
|
|||
// resolve it from the index settings
|
||||
request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
|
||||
}
|
||||
assert request
|
||||
.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
|
||||
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT
|
||||
: "request waitForActiveShards must be set in resolveRequest";
|
||||
|
||||
final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
|
||||
if (primary == null || primary.active() == false) {
|
||||
|
|
|
@ -332,10 +332,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
final Set<String> indexNamesInShards = new HashSet<>();
|
||||
shards.iterator().forEachRemaining(s -> {
|
||||
indexNamesInShards.add(s.key.getIndexName());
|
||||
assert source == null
|
||||
|| s.value.nodeId == null : "Shard snapshot must not be assigned to data node when copying from snapshot ["
|
||||
+ source
|
||||
+ "]";
|
||||
assert source == null || s.value.nodeId == null
|
||||
: "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
|
||||
});
|
||||
assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
|
||||
assert source != null || indexNames.equals(indexNamesInShards) : "Indices in shards "
|
||||
|
@ -348,12 +346,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
|
||||
// Check state consistency for normal snapshots and started clone operations
|
||||
if (source == null || clones.isEmpty() == false) {
|
||||
assert (state.completed() && shardsCompleted)
|
||||
|| (state.completed() == false
|
||||
&& shardsCompleted == false) : "Completed state must imply all shards completed but saw state ["
|
||||
+ state
|
||||
+ "] and shards "
|
||||
+ shards;
|
||||
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
|
||||
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
|
||||
}
|
||||
if (source != null && state.completed()) {
|
||||
assert hasFailures(clones) == false || state == State.FAILED : "Failed shard clones in ["
|
||||
|
@ -567,8 +561,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
userMetadata,
|
||||
version
|
||||
);
|
||||
assert updated.state().completed() == false
|
||||
&& completed(updated.shards().values()) == false : "Only running snapshots allowed but saw [" + updated + "]";
|
||||
assert updated.state().completed() == false && completed(updated.shards().values()) == false
|
||||
: "Only running snapshots allowed but saw [" + updated + "]";
|
||||
return updated;
|
||||
}
|
||||
|
||||
|
@ -966,8 +960,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
for (Entry entry : entries) {
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
|
||||
if (shard.value.isActive()) {
|
||||
assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>())
|
||||
.add(shard.key) : "Found duplicate shard assignments in " + entries;
|
||||
assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(shard.key)
|
||||
: "Found duplicate shard assignments in " + entries;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -378,8 +378,8 @@ public class CoordinationState {
|
|||
throw new CoordinationStateRejectedException("only allow reconfiguration if joinVotes have quorum for new config");
|
||||
}
|
||||
|
||||
assert clusterState.getLastCommittedConfiguration()
|
||||
.equals(getLastCommittedConfiguration()) : "last committed configuration should not change";
|
||||
assert clusterState.getLastCommittedConfiguration().equals(getLastCommittedConfiguration())
|
||||
: "last committed configuration should not change";
|
||||
|
||||
lastPublishedVersion = clusterState.version();
|
||||
lastPublishedConfiguration = clusterState.getLastAcceptedConfiguration();
|
||||
|
|
|
@ -1207,8 +1207,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
private ClusterState clusterStateWithNoMasterBlock(ClusterState clusterState) {
|
||||
if (clusterState.nodes().getMasterNodeId() != null) {
|
||||
// remove block if it already exists before adding new one
|
||||
assert clusterState.blocks()
|
||||
.hasGlobalBlockWithId(NO_MASTER_BLOCK_ID) == false : "NO_MASTER_BLOCK should only be added by Coordinator";
|
||||
assert clusterState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID) == false
|
||||
: "NO_MASTER_BLOCK should only be added by Coordinator";
|
||||
final ClusterBlocks clusterBlocks = ClusterBlocks.builder()
|
||||
.blocks(clusterState.blocks())
|
||||
.addGlobalBlock(noMasterBlockService.getNoMasterBlock())
|
||||
|
|
|
@ -193,20 +193,15 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
|
|||
if (joiniedNodeNameIds.isEmpty() == false) {
|
||||
Set<CoordinationMetadata.VotingConfigExclusion> currentVotingConfigExclusions = currentState.getVotingConfigExclusions();
|
||||
Set<CoordinationMetadata.VotingConfigExclusion> newVotingConfigExclusions = currentVotingConfigExclusions.stream()
|
||||
.map(
|
||||
e -> {
|
||||
.map(e -> {
|
||||
// Update nodeId in VotingConfigExclusion when a new node with excluded node name joins
|
||||
if (CoordinationMetadata.VotingConfigExclusion.MISSING_VALUE_MARKER.equals(e.getNodeId())
|
||||
&& joiniedNodeNameIds.containsKey(e.getNodeName())) {
|
||||
return new CoordinationMetadata.VotingConfigExclusion(
|
||||
joiniedNodeNameIds.get(e.getNodeName()),
|
||||
e.getNodeName()
|
||||
);
|
||||
return new CoordinationMetadata.VotingConfigExclusion(joiniedNodeNameIds.get(e.getNodeName()), e.getNodeName());
|
||||
} else {
|
||||
return e;
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// if VotingConfigExclusions did get updated
|
||||
|
|
|
@ -986,9 +986,8 @@ public class MetadataCreateIndexService {
|
|||
routingNumShards = calculateNumRoutingShards(numTargetShards, indexVersionCreated);
|
||||
}
|
||||
} else {
|
||||
assert IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(
|
||||
indexSettings
|
||||
) == false : "index.number_of_routing_shards should not be present on the target index on resize";
|
||||
assert IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettings) == false
|
||||
: "index.number_of_routing_shards should not be present on the target index on resize";
|
||||
routingNumShards = sourceMetadata.getRoutingNumShards();
|
||||
}
|
||||
return routingNumShards;
|
||||
|
|
|
@ -599,10 +599,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
ensureMutable();
|
||||
assert failedShard.assignedToNode() : "only assigned shards can be failed";
|
||||
assert indexMetadata.getIndex().equals(failedShard.index()) : "shard failed for unknown index (shard entry: " + failedShard + ")";
|
||||
assert getByAllocationId(
|
||||
failedShard.shardId(),
|
||||
failedShard.allocationId().getId()
|
||||
) == failedShard : "shard routing to fail does not exist in routing table, expected: "
|
||||
assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard
|
||||
: "shard routing to fail does not exist in routing table, expected: "
|
||||
+ failedShard
|
||||
+ " but was: "
|
||||
+ getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
|
||||
|
@ -850,12 +848,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
+ oldShard
|
||||
+ " by shard with same shard id but was "
|
||||
+ newShard;
|
||||
assert oldShard.unassigned() == false
|
||||
&& newShard.unassigned() == false : "only assigned shards can be updated in list of assigned shards (prev: "
|
||||
+ oldShard
|
||||
+ ", new: "
|
||||
+ newShard
|
||||
+ ")";
|
||||
assert oldShard.unassigned() == false && newShard.unassigned() == false
|
||||
: "only assigned shards can be updated in list of assigned shards (prev: " + oldShard + ", new: " + newShard + ")";
|
||||
assert oldShard.currentNodeId().equals(newShard.currentNodeId()) : "shard to update "
|
||||
+ oldShard
|
||||
+ " can only update "
|
||||
|
|
|
@ -101,16 +101,13 @@ public final class ShardRouting implements Writeable, ToXContentObject {
|
|||
assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE
|
||||
|| state == ShardRoutingState.INITIALIZING
|
||||
|| state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
|
||||
assert expectedShardSize >= 0
|
||||
|| state != ShardRoutingState.INITIALIZING
|
||||
|| state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state;
|
||||
assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING
|
||||
: expectedShardSize + " state: " + state;
|
||||
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
|
||||
assert (state == ShardRoutingState.UNASSIGNED
|
||||
|| state == ShardRoutingState.INITIALIZING) == (recoverySource != null) : "recovery source only available on unassigned or initializing shard but was "
|
||||
+ state;
|
||||
assert recoverySource == null
|
||||
|| recoverySource == PeerRecoverySource.INSTANCE
|
||||
|| primary : "replica shards always recover from primary";
|
||||
assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null)
|
||||
: "recovery source only available on unassigned or initializing shard but was " + state;
|
||||
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary
|
||||
: "replica shards always recover from primary";
|
||||
assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node "
|
||||
+ this;
|
||||
}
|
||||
|
@ -589,12 +586,8 @@ public final class ShardRouting implements Writeable, ToXContentObject {
|
|||
**/
|
||||
public boolean isSameAllocation(ShardRouting other) {
|
||||
boolean b = this.allocationId != null && other.allocationId != null && this.allocationId.getId().equals(other.allocationId.getId());
|
||||
assert b == false
|
||||
|| this.currentNodeId.equals(other.currentNodeId) : "ShardRoutings have the same allocation id but not the same node. This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
assert b == false || this.currentNodeId.equals(other.currentNodeId)
|
||||
: "ShardRoutings have the same allocation id but not the same node. This [" + this + "], other [" + other + "]";
|
||||
return b;
|
||||
}
|
||||
|
||||
|
@ -613,50 +606,35 @@ public final class ShardRouting implements Writeable, ToXContentObject {
|
|||
&& this.state == ShardRoutingState.INITIALIZING
|
||||
&& this.allocationId.getId().equals(other.allocationId.getRelocationId());
|
||||
|
||||
assert b == false
|
||||
|| other.state == ShardRoutingState.RELOCATING : "ShardRouting is a relocation target but the source shard state isn't relocating. This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
assert b == false || other.state == ShardRoutingState.RELOCATING
|
||||
: "ShardRouting is a relocation target but the source shard state isn't relocating. This [" + this + "], other [" + other + "]";
|
||||
|
||||
assert b == false
|
||||
|| other.allocationId.getId()
|
||||
.equals(
|
||||
this.allocationId.getRelocationId()
|
||||
) : "ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId."
|
||||
assert b == false || other.allocationId.getId().equals(this.allocationId.getRelocationId())
|
||||
: "ShardRouting is a relocation target but the source id isn't equal to source's allocationId.getRelocationId."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
|
||||
assert b == false
|
||||
|| other.currentNodeId()
|
||||
.equals(
|
||||
this.relocatingNodeId
|
||||
) : "ShardRouting is a relocation target but source current node id isn't equal to target relocating node."
|
||||
assert b == false || other.currentNodeId().equals(this.relocatingNodeId)
|
||||
: "ShardRouting is a relocation target but source current node id isn't equal to target relocating node."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
|
||||
assert b == false
|
||||
|| this.currentNodeId()
|
||||
.equals(
|
||||
other.relocatingNodeId
|
||||
) : "ShardRouting is a relocation target but current node id isn't equal to source relocating node."
|
||||
assert b == false || this.currentNodeId().equals(other.relocatingNodeId)
|
||||
: "ShardRouting is a relocation target but current node id isn't equal to source relocating node."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
|
||||
assert b == false
|
||||
|| this.shardId.equals(
|
||||
other.shardId
|
||||
) : "ShardRouting is a relocation target but both indexRoutings are not of the same shard id."
|
||||
assert b == false || this.shardId.equals(other.shardId)
|
||||
: "ShardRouting is a relocation target but both indexRoutings are not of the same shard id."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
|
@ -680,48 +658,35 @@ public final class ShardRouting implements Writeable, ToXContentObject {
|
|||
&& other.state == ShardRoutingState.INITIALIZING
|
||||
&& other.allocationId.getId().equals(this.allocationId.getRelocationId());
|
||||
|
||||
assert b == false
|
||||
|| this.state == ShardRoutingState.RELOCATING : "ShardRouting is a relocation source but shard state isn't relocating. This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
assert b == false || this.state == ShardRoutingState.RELOCATING
|
||||
: "ShardRouting is a relocation source but shard state isn't relocating. This [" + this + "], other [" + other + "]";
|
||||
|
||||
assert b == false
|
||||
|| this.allocationId.getId()
|
||||
.equals(
|
||||
other.allocationId.getRelocationId()
|
||||
) : "ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId."
|
||||
assert b == false || this.allocationId.getId().equals(other.allocationId.getRelocationId())
|
||||
: "ShardRouting is a relocation source but the allocation id isn't equal to other.allocationId.getRelocationId."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
|
||||
assert b == false
|
||||
|| this.currentNodeId()
|
||||
.equals(
|
||||
other.relocatingNodeId
|
||||
) : "ShardRouting is a relocation source but current node isn't equal to other's relocating node."
|
||||
assert b == false || this.currentNodeId().equals(other.relocatingNodeId)
|
||||
: "ShardRouting is a relocation source but current node isn't equal to other's relocating node."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
|
||||
assert b == false
|
||||
|| other.currentNodeId()
|
||||
.equals(
|
||||
this.relocatingNodeId
|
||||
) : "ShardRouting is a relocation source but relocating node isn't equal to other's current node."
|
||||
assert b == false || other.currentNodeId().equals(this.relocatingNodeId)
|
||||
: "ShardRouting is a relocation source but relocating node isn't equal to other's current node."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], other ["
|
||||
+ other
|
||||
+ "]";
|
||||
|
||||
assert b == false
|
||||
|| this.shardId.equals(other.shardId) : "ShardRouting is a relocation source but both indexRoutings are not of the same shard."
|
||||
assert b == false || this.shardId.equals(other.shardId)
|
||||
: "ShardRouting is a relocation source but both indexRoutings are not of the same shard."
|
||||
+ " This ["
|
||||
+ this
|
||||
+ "], target ["
|
||||
|
|
|
@ -528,8 +528,8 @@ public class AllocationService {
|
|||
|
||||
private void reroute(RoutingAllocation allocation) {
|
||||
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
|
||||
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation)
|
||||
.isEmpty() : "auto-expand replicas out of sync with number of nodes in the cluster";
|
||||
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
|
||||
: "auto-expand replicas out of sync with number of nodes in the cluster";
|
||||
assert assertInitialized();
|
||||
|
||||
removeDelayMarkers(allocation);
|
||||
|
@ -602,12 +602,10 @@ public class AllocationService {
|
|||
RoutingNodes routingNodes = routingAllocation.routingNodes();
|
||||
for (ShardRouting startedShard : startedShardEntries) {
|
||||
assert startedShard.initializing() : "only initializing shards can be started";
|
||||
assert routingAllocation.metadata()
|
||||
.index(startedShard.shardId().getIndex()) != null : "shard started for unknown index (shard entry: " + startedShard + ")";
|
||||
assert startedShard == routingNodes.getByAllocationId(
|
||||
startedShard.shardId(),
|
||||
startedShard.allocationId().getId()
|
||||
) : "shard routing to start does not exist in routing table, expected: "
|
||||
assert routingAllocation.metadata().index(startedShard.shardId().getIndex()) != null
|
||||
: "shard started for unknown index (shard entry: " + startedShard + ")";
|
||||
assert startedShard == routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId())
|
||||
: "shard routing to start does not exist in routing table, expected: "
|
||||
+ startedShard
|
||||
+ " but was: "
|
||||
+ routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId());
|
||||
|
|
|
@ -86,10 +86,8 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
|
|||
|
||||
@Override
|
||||
public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
|
||||
assert Objects.equals(
|
||||
initializingShard.allocationId().getId(),
|
||||
startedShard.allocationId().getId()
|
||||
) : "initializingShard.allocationId ["
|
||||
assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId())
|
||||
: "initializingShard.allocationId ["
|
||||
+ initializingShard.allocationId().getId()
|
||||
+ "] and startedShard.allocationId ["
|
||||
+ startedShard.allocationId().getId()
|
||||
|
@ -171,10 +169,8 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
|
|||
ShardId shardId,
|
||||
Updates updates
|
||||
) {
|
||||
assert Sets.haveEmptyIntersection(
|
||||
updates.addedAllocationIds,
|
||||
updates.removedAllocationIds
|
||||
) : "allocation ids cannot be both added and removed in the same allocation round, added ids: "
|
||||
assert Sets.haveEmptyIntersection(updates.addedAllocationIds, updates.removedAllocationIds)
|
||||
: "allocation ids cannot be both added and removed in the same allocation round, added ids: "
|
||||
+ updates.addedAllocationIds
|
||||
+ ", removed ids: "
|
||||
+ updates.removedAllocationIds;
|
||||
|
@ -217,9 +213,8 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
|
|||
inSyncAllocationIds.removeAll(updates.removedAllocationIds);
|
||||
|
||||
assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false
|
||||
|| inSyncAllocationIds.contains(
|
||||
RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID
|
||||
) == false : "fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds;
|
||||
|| inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false
|
||||
: "fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds;
|
||||
|
||||
// Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary
|
||||
// but repeatedly shut down nodes that have active replicas.
|
||||
|
@ -258,9 +253,8 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
|
|||
inSyncAllocationIds.add(updates.firstFailedPrimary.allocationId().getId());
|
||||
}
|
||||
|
||||
assert inSyncAllocationIds.isEmpty() == false
|
||||
|| oldInSyncAllocationIds.isEmpty() : "in-sync allocations cannot become empty after they have been non-empty: "
|
||||
+ oldInSyncAllocationIds;
|
||||
assert inSyncAllocationIds.isEmpty() == false || oldInSyncAllocationIds.isEmpty()
|
||||
: "in-sync allocations cannot become empty after they have been non-empty: " + oldInSyncAllocationIds;
|
||||
|
||||
// be extra safe here and only update in-sync set if it is non-empty
|
||||
if (inSyncAllocationIds.isEmpty() == false) {
|
||||
|
@ -295,11 +289,8 @@ public class IndexMetadataUpdater extends RoutingChangesObserver.AbstractRouting
|
|||
int shardNumber = shardEntry.getKey().getId();
|
||||
Set<String> oldInSyncAllocations = oldIndexMetadata.inSyncAllocationIds(shardNumber);
|
||||
Set<String> idsToRemove = shardEntry.getValue().stream().map(e -> e.getAllocationId()).collect(Collectors.toSet());
|
||||
assert idsToRemove.stream()
|
||||
.allMatch(id -> oldRoutingTable.getByAllocationId(shardEntry.getKey(), id) == null) : "removing stale ids: "
|
||||
+ idsToRemove
|
||||
+ ", some of which have still a routing entry: "
|
||||
+ oldRoutingTable;
|
||||
assert idsToRemove.stream().allMatch(id -> oldRoutingTable.getByAllocationId(shardEntry.getKey(), id) == null)
|
||||
: "removing stale ids: " + idsToRemove + ", some of which have still a routing entry: " + oldRoutingTable;
|
||||
Set<String> remainingInSyncAllocations = Sets.difference(oldInSyncAllocations, idsToRemove);
|
||||
assert remainingInSyncAllocations.isEmpty() == false : "Set of in-sync ids cannot become empty for shard "
|
||||
+ shardEntry.getKey()
|
||||
|
|
|
@ -91,9 +91,8 @@ public class RoutingNodesChangedObserver implements RoutingChangesObserver {
|
|||
|
||||
@Override
|
||||
public void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource) {
|
||||
assert removedReplicaRelocationSource.primary() == false
|
||||
&& removedReplicaRelocationSource.isRelocationTarget() : "expected replica relocation target shard "
|
||||
+ removedReplicaRelocationSource;
|
||||
assert removedReplicaRelocationSource.primary() == false && removedReplicaRelocationSource.isRelocationTarget()
|
||||
: "expected replica relocation target shard " + removedReplicaRelocationSource;
|
||||
setChanged();
|
||||
}
|
||||
|
||||
|
@ -108,11 +107,8 @@ public class RoutingNodesChangedObserver implements RoutingChangesObserver {
|
|||
assert oldReplica.initializing() && oldReplica.primary() == false : "expected initializing replica shard " + oldReplica;
|
||||
assert reinitializedReplica.initializing() && reinitializedReplica.primary() == false : "expected reinitialized replica shard "
|
||||
+ reinitializedReplica;
|
||||
assert oldReplica.allocationId()
|
||||
.getId()
|
||||
.equals(
|
||||
reinitializedReplica.allocationId().getId()
|
||||
) == false : "expected allocation id to change for reinitialized replica shard (old: "
|
||||
assert oldReplica.allocationId().getId().equals(reinitializedReplica.allocationId().getId()) == false
|
||||
: "expected allocation id to change for reinitialized replica shard (old: "
|
||||
+ oldReplica
|
||||
+ " new: "
|
||||
+ reinitializedReplica
|
||||
|
|
|
@ -237,9 +237,8 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|||
|
||||
public static boolean assertClusterOrMasterStateThread() {
|
||||
assert Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME)
|
||||
|| Thread.currentThread()
|
||||
.getName()
|
||||
.contains(MasterService.MASTER_UPDATE_THREAD_NAME) : "not called from the master/cluster state update thread";
|
||||
|| Thread.currentThread().getName().contains(MasterService.MASTER_UPDATE_THREAD_NAME)
|
||||
: "not called from the master/cluster state update thread";
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -539,7 +539,8 @@ public class MasterService extends AbstractLifecycleComponent {
|
|||
*/
|
||||
public List<PendingClusterTask> pendingTasks() {
|
||||
return Arrays.stream(threadPoolExecutor.getPending()).map(pending -> {
|
||||
assert pending.task instanceof SourcePrioritizedRunnable : "thread pool executor should only use SourcePrioritizedRunnable instances but found: "
|
||||
assert pending.task instanceof SourcePrioritizedRunnable
|
||||
: "thread pool executor should only use SourcePrioritizedRunnable instances but found: "
|
||||
+ pending.task.getClass().getName();
|
||||
SourcePrioritizedRunnable task = (SourcePrioritizedRunnable) pending.task;
|
||||
return new PendingClusterTask(
|
||||
|
|
|
@ -71,9 +71,8 @@ public abstract class TaskBatcher {
|
|||
return;
|
||||
}
|
||||
final BatchedTask firstTask = tasks.get(0);
|
||||
assert tasks.stream()
|
||||
.allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: "
|
||||
+ tasks;
|
||||
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey)
|
||||
: "tasks submitted in a batch should share the same batching key: " + tasks;
|
||||
// convert to an identity map to check for dups based on task identity
|
||||
final Map<Object, BatchedTask> tasksIdentity = tasks.stream()
|
||||
.collect(
|
||||
|
@ -124,8 +123,8 @@ public abstract class TaskBatcher {
|
|||
if (toRemove.isEmpty() == false) {
|
||||
BatchedTask firstTask = toRemove.get(0);
|
||||
Object batchingKey = firstTask.batchingKey;
|
||||
assert tasks.stream()
|
||||
.allMatch(t -> t.batchingKey == batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks;
|
||||
assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey)
|
||||
: "tasks submitted in a batch should share the same batching key: " + tasks;
|
||||
synchronized (tasksPerBatchingKey) {
|
||||
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey);
|
||||
if (existingTasks != null) {
|
||||
|
|
|
@ -570,12 +570,8 @@ public abstract class LocalTimeOffset {
|
|||
long utcStart = transition.toEpochSecond() * 1000;
|
||||
long offsetBeforeMillis = transition.getOffsetBefore().getTotalSeconds() * 1000;
|
||||
long offsetAfterMillis = transition.getOffsetAfter().getTotalSeconds() * 1000;
|
||||
assert (false == previous instanceof Transition)
|
||||
|| ((Transition) previous).startUtcMillis < utcStart : "transition list out of order at ["
|
||||
+ previous
|
||||
+ "] and ["
|
||||
+ transition
|
||||
+ "]";
|
||||
assert (false == previous instanceof Transition) || ((Transition) previous).startUtcMillis < utcStart
|
||||
: "transition list out of order at [" + previous + "] and [" + transition + "]";
|
||||
assert previous.millis != offsetAfterMillis : "transition list is has a duplicate at ["
|
||||
+ previous
|
||||
+ "] and ["
|
||||
|
|
|
@ -117,10 +117,8 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
* entirely for these readers.
|
||||
*/
|
||||
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context) throws IOException {
|
||||
assert context.reader()
|
||||
.getCoreCacheHelper()
|
||||
.getKey()
|
||||
.equals(readerKey) : "context's reader is not the same as the reader class was initialized on.";
|
||||
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey)
|
||||
: "context's reader is not the same as the reader class was initialized on.";
|
||||
int docID = getDocID(id, context);
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
|
@ -174,10 +172,8 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
|
||||
/** Return null if id is not found. */
|
||||
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
|
||||
assert context.reader()
|
||||
.getCoreCacheHelper()
|
||||
.getKey()
|
||||
.equals(readerKey) : "context's reader is not the same as the reader class was initialized on.";
|
||||
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey)
|
||||
: "context's reader is not the same as the reader class was initialized on.";
|
||||
final int docID = getDocID(id, context);
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
|
||||
|
|
|
@ -173,9 +173,8 @@ public class Setting<T> implements ToXContentObject {
|
|||
Validator<T> validator,
|
||||
Property... properties
|
||||
) {
|
||||
assert this instanceof SecureSetting
|
||||
|| this.isGroupSetting()
|
||||
|| parser.apply(defaultValue.apply(Settings.EMPTY)) != null : "parser returned null";
|
||||
assert this instanceof SecureSetting || this.isGroupSetting() || parser.apply(defaultValue.apply(Settings.EMPTY)) != null
|
||||
: "parser returned null";
|
||||
this.key = key;
|
||||
this.fallbackSetting = fallbackSetting;
|
||||
this.defaultValue = defaultValue;
|
||||
|
|
|
@ -455,9 +455,8 @@ public class BigArrays {
|
|||
private <T extends AbstractBigArray> T resizeInPlace(T array, long newSize) {
|
||||
final long oldMemSize = array.ramBytesUsed();
|
||||
final long oldSize = array.size();
|
||||
assert oldMemSize == array.ramBytesEstimated(
|
||||
oldSize
|
||||
) : "ram bytes used should equal that which was previously estimated: ramBytesUsed="
|
||||
assert oldMemSize == array.ramBytesEstimated(oldSize)
|
||||
: "ram bytes used should equal that which was previously estimated: ramBytesUsed="
|
||||
+ oldMemSize
|
||||
+ ", ramBytesEstimated="
|
||||
+ array.ramBytesEstimated(oldSize);
|
||||
|
|
|
@ -178,9 +178,8 @@ public final class QueueResizingOpenSearchThreadPoolExecutor extends OpenSearchT
|
|||
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
|
||||
|
||||
final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
|
||||
assert taskExecutionNanos >= 0
|
||||
|| (failedOrRejected
|
||||
&& taskExecutionNanos == -1) : "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: "
|
||||
assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1)
|
||||
: "expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: "
|
||||
+ taskExecutionNanos
|
||||
+ ", failedOrRejected: "
|
||||
+ failedOrRejected;
|
||||
|
|
|
@ -563,10 +563,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
}
|
||||
|
||||
private static boolean assertPathsDoNotExist(final Path[] paths) {
|
||||
Set<Path> existingPaths = Stream.of(paths)
|
||||
.filter(FileSystemUtils::exists)
|
||||
.filter(
|
||||
leftOver -> {
|
||||
Set<Path> existingPaths = Stream.of(paths).filter(FileSystemUtils::exists).filter(leftOver -> {
|
||||
// Relaxed assertion for the special case where only the empty state directory exists after deleting
|
||||
// the shard directory because it was created again as a result of a metadata read action concurrently.
|
||||
try (DirectoryStream<Path> children = Files.newDirectoryStream(leftOver)) {
|
||||
|
@ -584,9 +581,7 @@ public final class NodeEnvironment implements Closeable {
|
|||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
)
|
||||
.collect(Collectors.toSet());
|
||||
}).collect(Collectors.toSet());
|
||||
assert existingPaths.size() == 0 : "Paths exist that should have been deleted: " + existingPaths;
|
||||
return existingPaths.size() == 0;
|
||||
}
|
||||
|
|
|
@ -125,8 +125,8 @@ public final class NodeMetadata {
|
|||
public NodeMetadata build() {
|
||||
final Version nodeVersion;
|
||||
if (this.nodeVersion == null) {
|
||||
assert Version.CURRENT.major <= LegacyESVersion.V_7_0_0.major
|
||||
+ 1 : "version is required in the node metadata from v9 onwards";
|
||||
assert Version.CURRENT.major <= LegacyESVersion.V_7_0_0.major + 1
|
||||
: "version is required in the node metadata from v9 onwards";
|
||||
nodeVersion = Version.V_EMPTY;
|
||||
} else {
|
||||
nodeVersion = this.nodeVersion;
|
||||
|
|
|
@ -134,8 +134,8 @@ public class GatewayMetaState implements Closeable {
|
|||
long currentTerm = onDiskState.currentTerm;
|
||||
|
||||
if (onDiskState.empty()) {
|
||||
assert Version.CURRENT.major <= LegacyESVersion.V_7_0_0.major
|
||||
+ 1 : "legacy metadata loader is not needed anymore from v9 onwards";
|
||||
assert Version.CURRENT.major <= LegacyESVersion.V_7_0_0.major + 1
|
||||
: "legacy metadata loader is not needed anymore from v9 onwards";
|
||||
final Tuple<Manifest, Metadata> legacyState = metaStateService.loadFullState();
|
||||
if (legacyState.v1().isEmpty() == false) {
|
||||
metadata = legacyState.v2();
|
||||
|
|
|
@ -368,9 +368,8 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
|
|||
}
|
||||
|
||||
if (allocationId != null) {
|
||||
assert nodeShardState.storeException() == null
|
||||
|| nodeShardState
|
||||
.storeException() instanceof ShardLockObtainFailedException : "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a "
|
||||
assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException
|
||||
: "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a "
|
||||
+ "store throwing "
|
||||
+ nodeShardState.storeException();
|
||||
numberOfAllocationsFound++;
|
||||
|
|
|
@ -128,8 +128,8 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
|
|||
finalContent = BytesArray.EMPTY;
|
||||
}
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
assert restResponse
|
||||
.status() == RestStatus.METHOD_NOT_ALLOWED : "request HTTP method is unsupported but HTTP status is not METHOD_NOT_ALLOWED(405)";
|
||||
assert restResponse.status() == RestStatus.METHOD_NOT_ALLOWED
|
||||
: "request HTTP method is unsupported but HTTP status is not METHOD_NOT_ALLOWED(405)";
|
||||
}
|
||||
|
||||
final HttpResponse httpResponse = httpRequest.createResponse(restResponse.status(), finalContent);
|
||||
|
|
|
@ -57,11 +57,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene87Codec {
|
|||
private final DocValuesFormat dvFormat = new Lucene80DocValuesFormat(Lucene80DocValuesFormat.Mode.BEST_COMPRESSION);
|
||||
|
||||
static {
|
||||
assert Codec.forName(Lucene.LATEST_CODEC)
|
||||
.getClass()
|
||||
.isAssignableFrom(PerFieldMappingPostingFormatCodec.class) : "PerFieldMappingPostingFormatCodec must subclass the latest "
|
||||
+ "lucene codec: "
|
||||
+ Lucene.LATEST_CODEC;
|
||||
assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class)
|
||||
: "PerFieldMappingPostingFormatCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
|
||||
}
|
||||
|
||||
public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService mapperService, Logger logger) {
|
||||
|
|
|
@ -1466,11 +1466,8 @@ public abstract class Engine implements Closeable {
|
|||
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
|
||||
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
|
||||
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
|
||||
assert (origin == Origin.PRIMARY)
|
||||
|| (ifSeqNo == UNASSIGNED_SEQ_NO
|
||||
&& ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM) : "cas operations are only allowed if origin is primary. get ["
|
||||
+ origin
|
||||
+ "]";
|
||||
assert (origin == Origin.PRIMARY) || (ifSeqNo == UNASSIGNED_SEQ_NO && ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM)
|
||||
: "cas operations are only allowed if origin is primary. get [" + origin + "]";
|
||||
this.doc = doc;
|
||||
this.isRetry = isRetry;
|
||||
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
|
||||
|
@ -1585,11 +1582,8 @@ public abstract class Engine implements Closeable {
|
|||
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
|
||||
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
|
||||
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
|
||||
assert (origin == Origin.PRIMARY)
|
||||
|| (ifSeqNo == UNASSIGNED_SEQ_NO
|
||||
&& ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM) : "cas operations are only allowed if origin is primary. get ["
|
||||
+ origin
|
||||
+ "]";
|
||||
assert (origin == Origin.PRIMARY) || (ifSeqNo == UNASSIGNED_SEQ_NO && ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM)
|
||||
: "cas operations are only allowed if origin is primary. get [" + origin + "]";
|
||||
this.type = Objects.requireNonNull(type);
|
||||
this.id = Objects.requireNonNull(id);
|
||||
this.ifSeqNo = ifSeqNo;
|
||||
|
|
|
@ -480,8 +480,8 @@ public class InternalEngine extends Engine {
|
|||
|
||||
}
|
||||
syncTranslog(); // to persist noops associated with the advancement of the local checkpoint
|
||||
assert localCheckpointTracker
|
||||
.getPersistedCheckpoint() == maxSeqNo : "persisted local checkpoint did not advance to max seq no; is ["
|
||||
assert localCheckpointTracker.getPersistedCheckpoint() == maxSeqNo
|
||||
: "persisted local checkpoint did not advance to max seq no; is ["
|
||||
+ localCheckpointTracker.getPersistedCheckpoint()
|
||||
+ "], max seq no ["
|
||||
+ maxSeqNo
|
||||
|
@ -1348,10 +1348,10 @@ public class InternalEngine extends Engine {
|
|||
int reservedDocs,
|
||||
IndexResult earlyResultOnPreFlightError
|
||||
) {
|
||||
assert useLuceneUpdateDocument == false
|
||||
|| indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene";
|
||||
assert (indexIntoLucene
|
||||
&& earlyResultOnPreFlightError != null) == false : "can only index into lucene or have a preflight result but not both."
|
||||
assert useLuceneUpdateDocument == false || indexIntoLucene
|
||||
: "use lucene update is set to true, but we're not indexing into lucene";
|
||||
assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false
|
||||
: "can only index into lucene or have a preflight result but not both."
|
||||
+ "indexIntoLucene: "
|
||||
+ indexIntoLucene
|
||||
+ " earlyResultOnPreFlightError:"
|
||||
|
@ -1699,8 +1699,8 @@ public class InternalEngine extends Engine {
|
|||
int reservedDocs,
|
||||
DeleteResult earlyResultOnPreflightError
|
||||
) {
|
||||
assert (deleteFromLucene
|
||||
&& earlyResultOnPreflightError != null) == false : "can only delete from lucene or have a preflight result but not both."
|
||||
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false
|
||||
: "can only delete from lucene or have a preflight result but not both."
|
||||
+ "deleteFromLucene: "
|
||||
+ deleteFromLucene
|
||||
+ " earlyResultOnPreFlightError:"
|
||||
|
@ -1808,9 +1808,8 @@ public class InternalEngine extends Engine {
|
|||
tombstone.version().setLongValue(1L);
|
||||
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
|
||||
final ParseContext.Document doc = tombstone.docs().get(0);
|
||||
assert doc.getField(
|
||||
SeqNoFieldMapper.TOMBSTONE_NAME
|
||||
) != null : "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
|
||||
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
|
||||
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
|
||||
doc.add(softDeletesField);
|
||||
indexWriter.addDocument(doc);
|
||||
} catch (final Exception ex) {
|
||||
|
@ -2367,9 +2366,8 @@ public class InternalEngine extends Engine {
|
|||
@Override
|
||||
protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
|
||||
if (isClosed.compareAndSet(false, true)) {
|
||||
assert rwl.isWriteLockedByCurrentThread()
|
||||
|| failEngineLock
|
||||
.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
|
||||
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
|
||||
: "Either the write lock must be held or the engine must be currently be failing itself";
|
||||
try {
|
||||
this.versionMap.clear();
|
||||
if (internalReaderManager != null) {
|
||||
|
|
|
@ -140,8 +140,8 @@ public class GetResult implements Writeable, Iterable<DocumentField>, ToXContent
|
|||
+ seqNo
|
||||
+ " primaryTerm: "
|
||||
+ primaryTerm;
|
||||
assert exists
|
||||
|| (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) : "doc not found but seqNo/primaryTerm are set";
|
||||
assert exists || (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM)
|
||||
: "doc not found but seqNo/primaryTerm are set";
|
||||
this.version = version;
|
||||
this.exists = exists;
|
||||
this.source = source;
|
||||
|
|
|
@ -230,9 +230,8 @@ public class LocalCheckpointTracker {
|
|||
@SuppressForbidden(reason = "Object#notifyAll")
|
||||
private void updateCheckpoint(AtomicLong checkPoint, LongObjectHashMap<CountedBitSet> bitSetMap) {
|
||||
assert Thread.holdsLock(this);
|
||||
assert getBitSetForSeqNo(bitSetMap, checkPoint.get() + 1).get(
|
||||
seqNoToBitSetOffset(checkPoint.get() + 1)
|
||||
) : "updateCheckpoint is called but the bit following the checkpoint is not set";
|
||||
assert getBitSetForSeqNo(bitSetMap, checkPoint.get() + 1).get(seqNoToBitSetOffset(checkPoint.get() + 1))
|
||||
: "updateCheckpoint is called but the bit following the checkpoint is not set";
|
||||
try {
|
||||
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
|
||||
long bitSetKey = getBitSetKey(checkPoint.get());
|
||||
|
|
|
@ -845,23 +845,15 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
assert pendingInSync.isEmpty() || (primaryMode && !handoffInProgress);
|
||||
|
||||
// the computed global checkpoint is always up-to-date
|
||||
assert !primaryMode
|
||||
|| globalCheckpoint == computeGlobalCheckpoint(
|
||||
pendingInSync,
|
||||
checkpoints.values(),
|
||||
globalCheckpoint
|
||||
) : "global checkpoint is not up-to-date, expected: "
|
||||
assert !primaryMode || globalCheckpoint == computeGlobalCheckpoint(pendingInSync, checkpoints.values(), globalCheckpoint)
|
||||
: "global checkpoint is not up-to-date, expected: "
|
||||
+ computeGlobalCheckpoint(pendingInSync, checkpoints.values(), globalCheckpoint)
|
||||
+ " but was: "
|
||||
+ globalCheckpoint;
|
||||
|
||||
// when in primary mode, the global checkpoint is at most the minimum local checkpoint on all in-sync shard copies
|
||||
assert !primaryMode
|
||||
|| globalCheckpoint <= inSyncCheckpointStates(
|
||||
checkpoints,
|
||||
CheckpointState::getLocalCheckpoint,
|
||||
LongStream::min
|
||||
) : "global checkpoint ["
|
||||
assert !primaryMode || globalCheckpoint <= inSyncCheckpointStates(checkpoints, CheckpointState::getLocalCheckpoint, LongStream::min)
|
||||
: "global checkpoint ["
|
||||
+ globalCheckpoint
|
||||
+ "] "
|
||||
+ "for primary mode allocation ID ["
|
||||
|
@ -877,11 +869,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
+ " but replication group is "
|
||||
+ replicationGroup;
|
||||
|
||||
assert replicationGroup == null
|
||||
|| replicationGroup.equals(calculateReplicationGroup()) : "cached replication group out of sync: expected: "
|
||||
+ calculateReplicationGroup()
|
||||
+ " but was: "
|
||||
+ replicationGroup;
|
||||
assert replicationGroup == null || replicationGroup.equals(calculateReplicationGroup())
|
||||
: "cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + replicationGroup;
|
||||
|
||||
// all assigned shards from the routing table are tracked
|
||||
assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getAllAllocationIds()) : "local checkpoints "
|
||||
|
@ -907,9 +896,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
// all tracked shard copies have a corresponding peer-recovery retention lease
|
||||
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
|
||||
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
|
||||
assert retentionLeases.contains(
|
||||
getPeerRecoveryRetentionLeaseId(shardRouting)
|
||||
) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
|
||||
assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|
||||
: "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases;
|
||||
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(
|
||||
retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source()
|
||||
) : "incorrect source ["
|
||||
|
@ -1190,10 +1178,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
if (applyingClusterStateVersion > appliedClusterStateVersion) {
|
||||
// check that the master does not fabricate new in-sync entries out of thin air once we are in primary mode
|
||||
assert !primaryMode
|
||||
|| inSyncAllocationIds.stream()
|
||||
.allMatch(
|
||||
inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync
|
||||
) : "update from master in primary mode contains in-sync ids "
|
||||
|| inSyncAllocationIds.stream().allMatch(inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync)
|
||||
: "update from master in primary mode contains in-sync ids "
|
||||
+ inSyncAllocationIds
|
||||
+ " that have no matching entries in "
|
||||
+ checkpoints;
|
||||
|
|
|
@ -516,9 +516,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
|
||||
assert currentRouting.isRelocationTarget() == false
|
||||
|| currentRouting.primary() == false
|
||||
|| replicationTracker
|
||||
.isPrimaryMode() : "a primary relocation is completed by the master, but primary mode is not active "
|
||||
+ currentRouting;
|
||||
|| replicationTracker.isPrimaryMode()
|
||||
: "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
|
||||
|
||||
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
|
||||
} else if (currentRouting.primary()
|
||||
|
@ -533,12 +532,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
"Shard is marked as relocated, cannot safely move to state " + newRouting.state()
|
||||
);
|
||||
}
|
||||
assert newRouting.active() == false
|
||||
|| state == IndexShardState.STARTED
|
||||
|| state == IndexShardState.CLOSED : "routing is active, but local shard state isn't. routing: "
|
||||
+ newRouting
|
||||
+ ", local state: "
|
||||
+ state;
|
||||
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.CLOSED
|
||||
: "routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
|
||||
persistMetadata(path, indexSettings, newRouting, currentRouting, logger);
|
||||
final CountDownLatch shardStateUpdated = new CountDownLatch(1);
|
||||
|
||||
|
@ -726,8 +721,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
forceRefreshes.close();
|
||||
// no shard operation permits are being held here, move state from started to relocated
|
||||
assert indexShardOperationPermits
|
||||
.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated";
|
||||
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
|
||||
: "in-flight operations in progress while moving shard state to relocated";
|
||||
/*
|
||||
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
|
||||
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
|
||||
|
@ -1516,9 +1511,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
|
||||
assert OpenSearchDirectoryReader.unwrap(
|
||||
searcher.getDirectoryReader()
|
||||
) != null : "DirectoryReader must be an instance or OpenSearchDirectoryReader";
|
||||
assert OpenSearchDirectoryReader.unwrap(searcher.getDirectoryReader()) != null
|
||||
: "DirectoryReader must be an instance or OpenSearchDirectoryReader";
|
||||
boolean success = false;
|
||||
try {
|
||||
final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);
|
||||
|
@ -1945,8 +1939,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
// but we need to make sure we don't loose deletes until we are done recovering
|
||||
config.setEnableGcDeletes(false);
|
||||
updateRetentionLeasesOnReplica(loadRetentionLeases());
|
||||
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false
|
||||
|| getRetentionLeases().leases().isEmpty() : "expected empty set of retention leases with recovery source ["
|
||||
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
|
||||
: "expected empty set of retention leases with recovery source ["
|
||||
+ recoveryState.getRecoverySource()
|
||||
+ "] but got "
|
||||
+ getRetentionLeases();
|
||||
|
@ -2085,9 +2079,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
assert assertReplicationTarget();
|
||||
} else {
|
||||
assert origin == Engine.Operation.Origin.LOCAL_RESET;
|
||||
assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "locally resetting without blocking operations, active operations are ["
|
||||
+ getActiveOperations()
|
||||
+ "]";
|
||||
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
|
||||
: "locally resetting without blocking operations, active operations are [" + getActiveOperations() + "]";
|
||||
}
|
||||
if (writeAllowedStates.contains(state) == false) {
|
||||
throw new IllegalIndexShardStateException(
|
||||
|
@ -2793,8 +2786,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move
|
||||
* to recovery finalization, or even finished recovery before the update arrives here.
|
||||
*/
|
||||
assert state() != IndexShardState.POST_RECOVERY
|
||||
&& state() != IndexShardState.STARTED : "supposedly in-sync shard copy received a global checkpoint ["
|
||||
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED
|
||||
: "supposedly in-sync shard copy received a global checkpoint ["
|
||||
+ globalCheckpoint
|
||||
+ "] "
|
||||
+ "that is higher than its local checkpoint ["
|
||||
|
@ -2811,9 +2804,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* @param primaryContext the sequence number context
|
||||
*/
|
||||
public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
|
||||
assert shardRouting.primary()
|
||||
&& shardRouting.isRelocationTarget() : "only primary relocation target can update allocation IDs from primary context: "
|
||||
+ shardRouting;
|
||||
assert shardRouting.primary() && shardRouting.isRelocationTarget()
|
||||
: "only primary relocation target can update allocation IDs from primary context: " + shardRouting;
|
||||
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) : "primary context ["
|
||||
+ primaryContext
|
||||
+ "] does not contain relocation target ["
|
||||
|
|
|
@ -85,9 +85,8 @@ public class ReplicationGroup {
|
|||
replicationTargets.add(relocationTarget);
|
||||
} else {
|
||||
skippedShards.add(relocationTarget);
|
||||
assert inSyncAllocationIds.contains(
|
||||
relocationTarget.allocationId().getId()
|
||||
) == false : "in-sync shard copy but not tracked: " + shard;
|
||||
assert inSyncAllocationIds.contains(relocationTarget.allocationId().getId()) == false
|
||||
: "in-sync shard copy but not tracked: " + shard;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,17 +62,12 @@ public final class ShardPath {
|
|||
public ShardPath(boolean isCustomDataPath, Path dataPath, Path shardStatePath, ShardId shardId) {
|
||||
assert dataPath.getFileName().toString().equals(Integer.toString(shardId.id())) : "dataPath must end with the shard ID but didn't: "
|
||||
+ dataPath.toString();
|
||||
assert shardStatePath.getFileName()
|
||||
.toString()
|
||||
.equals(Integer.toString(shardId.id())) : "shardStatePath must end with the shard ID but didn't: " + dataPath.toString();
|
||||
assert dataPath.getParent()
|
||||
.getFileName()
|
||||
.toString()
|
||||
.equals(shardId.getIndex().getUUID()) : "dataPath must end with index path id but didn't: " + dataPath.toString();
|
||||
assert shardStatePath.getParent()
|
||||
.getFileName()
|
||||
.toString()
|
||||
.equals(shardId.getIndex().getUUID()) : "shardStatePath must end with index path id but didn't: " + dataPath.toString();
|
||||
assert shardStatePath.getFileName().toString().equals(Integer.toString(shardId.id()))
|
||||
: "shardStatePath must end with the shard ID but didn't: " + dataPath.toString();
|
||||
assert dataPath.getParent().getFileName().toString().equals(shardId.getIndex().getUUID())
|
||||
: "dataPath must end with index path id but didn't: " + dataPath.toString();
|
||||
assert shardStatePath.getParent().getFileName().toString().equals(shardId.getIndex().getUUID())
|
||||
: "shardStatePath must end with index path id but didn't: " + dataPath.toString();
|
||||
if (isCustomDataPath && dataPath.equals(shardStatePath)) {
|
||||
throw new IllegalArgumentException("shard state path must be different to the data path when using custom data paths");
|
||||
}
|
||||
|
|
|
@ -103,8 +103,8 @@ final class StoreRecovery {
|
|||
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
|
||||
if (canRecover(indexShard)) {
|
||||
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
|
||||
assert recoveryType == RecoverySource.Type.EMPTY_STORE
|
||||
|| recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType;
|
||||
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE
|
||||
: "expected store recovery type but was: " + recoveryType;
|
||||
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
|
||||
logger.debug("starting recovery from store ...");
|
||||
internalRecoverFromStore(indexShard);
|
||||
|
|
|
@ -1095,9 +1095,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
Collections.unmodifiableList(different),
|
||||
Collections.unmodifiableList(missing)
|
||||
);
|
||||
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN)
|
||||
? 1
|
||||
: 0) : "some files are missing recoveryDiff size: ["
|
||||
assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0)
|
||||
: "some files are missing recoveryDiff size: ["
|
||||
+ recoveryDiff.size()
|
||||
+ "] metadata size: ["
|
||||
+ this.metadata.size()
|
||||
|
|
|
@ -199,10 +199,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
//
|
||||
// For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that
|
||||
// file exists. If not we don't even try to clean it up and wait until we fail creating it
|
||||
assert Files.exists(nextTranslogFile) == false
|
||||
|| Files.size(nextTranslogFile) <= TranslogHeader.headerSizeInBytes(translogUUID) : "unexpected translog file: ["
|
||||
+ nextTranslogFile
|
||||
+ "]";
|
||||
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogHeader.headerSizeInBytes(translogUUID)
|
||||
: "unexpected translog file: [" + nextTranslogFile + "]";
|
||||
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
|
||||
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
|
||||
logger.warn(
|
||||
|
@ -399,7 +397,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
assert calledFromOutsideOrViaTragedyClose() : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
|
||||
assert calledFromOutsideOrViaTragedyClose()
|
||||
: "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method";
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
try {
|
||||
|
@ -439,11 +438,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (readers.isEmpty()) {
|
||||
return current.getGeneration();
|
||||
} else {
|
||||
assert readers.stream()
|
||||
.map(TranslogReader::getGeneration)
|
||||
.min(Long::compareTo)
|
||||
.get()
|
||||
.equals(readers.get(0).getGeneration()) : "the first translog isn't the one with the minimum generation:" + readers;
|
||||
assert readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).get().equals(readers.get(0).getGeneration())
|
||||
: "the first translog isn't the one with the minimum generation:" + readers;
|
||||
return readers.get(0).getGeneration();
|
||||
}
|
||||
}
|
||||
|
@ -740,10 +736,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (snapshots.length == 0) {
|
||||
onClose = () -> {};
|
||||
} else {
|
||||
assert Arrays.stream(snapshots)
|
||||
.map(BaseTranslogReader::getGeneration)
|
||||
.min(Long::compareTo)
|
||||
.get() == snapshots[0].generation : "first reader generation of " + snapshots + " is not the smallest";
|
||||
assert Arrays.stream(snapshots).map(BaseTranslogReader::getGeneration).min(Long::compareTo).get() == snapshots[0].generation
|
||||
: "first reader generation of " + snapshots + " is not the smallest";
|
||||
onClose = acquireTranslogGenFromDeletionPolicy(snapshots[0].generation);
|
||||
}
|
||||
boolean success = false;
|
||||
|
@ -759,8 +753,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
|
||||
private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
|
||||
assert readLock.isHeldByCurrentThread()
|
||||
|| writeLock.isHeldByCurrentThread() : "callers of readersAboveMinSeqNo must hold a lock: readLock ["
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread()
|
||||
: "callers of readersAboveMinSeqNo must hold a lock: readLock ["
|
||||
+ readLock.isHeldByCurrentThread()
|
||||
+ "], writeLock ["
|
||||
+ readLock.isHeldByCurrentThread()
|
||||
|
@ -1806,8 +1800,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
current.sync();
|
||||
deleteReaderFiles(reader);
|
||||
}
|
||||
assert readers.isEmpty() == false
|
||||
|| current.generation == minReferencedGen : "all readers were cleaned but the minReferenceGen ["
|
||||
assert readers.isEmpty() == false || current.generation == minReferencedGen
|
||||
: "all readers were cleaned but the minReferenceGen ["
|
||||
+ minReferencedGen
|
||||
+ "] is not the current writer's gen ["
|
||||
+ current.generation
|
||||
|
|
|
@ -850,8 +850,8 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
|
||||
indexShard.addShardFailureCallback(onShardFailure);
|
||||
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> {
|
||||
assert recoveryState.getRecoverySource()
|
||||
.getType() == RecoverySource.Type.LOCAL_SHARDS : "mapping update consumer only required by local shards recovery";
|
||||
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
|
||||
: "mapping update consumer only required by local shards recovery";
|
||||
client.admin()
|
||||
.indices()
|
||||
.preparePutMapping()
|
||||
|
|
|
@ -636,9 +636,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
ClusterState clusterState
|
||||
) {
|
||||
final ShardRouting currentRoutingEntry = shard.routingEntry();
|
||||
assert currentRoutingEntry.isSameAllocation(
|
||||
shardRouting
|
||||
) : "local shard has a different allocation id but wasn't cleaned by removeShards. "
|
||||
assert currentRoutingEntry.isSameAllocation(shardRouting)
|
||||
: "local shard has a different allocation id but wasn't cleaned by removeShards. "
|
||||
+ "cluster state: "
|
||||
+ shardRouting
|
||||
+ " local: "
|
||||
|
|
|
@ -260,8 +260,8 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
|
|||
if (removed != null) {
|
||||
shard.recoveryStats().decCurrentAsSource();
|
||||
removed.cancel();
|
||||
assert nodeToHandlers.getOrDefault(removed.targetNode(), Collections.emptySet())
|
||||
.contains(removed) : "Remote recovery was not properly tracked [" + removed + "]";
|
||||
assert nodeToHandlers.getOrDefault(removed.targetNode(), Collections.emptySet()).contains(removed)
|
||||
: "Remote recovery was not properly tracked [" + removed + "]";
|
||||
nodeToHandlers.computeIfPresent(removed.targetNode(), (k, handlersForNode) -> {
|
||||
handlersForNode.remove(removed);
|
||||
if (handlersForNode.isEmpty()) {
|
||||
|
|
|
@ -233,12 +233,8 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|||
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
|
||||
indexShard.prepareForIndexRecovery();
|
||||
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
|
||||
assert startingSeqNo == UNASSIGNED_SEQ_NO
|
||||
|| recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage ["
|
||||
+ recoveryTarget.state().getStage()
|
||||
+ "] starting seqno [ "
|
||||
+ startingSeqNo
|
||||
+ "]";
|
||||
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
|
||||
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
|
||||
startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
|
||||
requestToSend = startRequest;
|
||||
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
|
||||
|
@ -469,9 +465,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|||
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
|
||||
request.retentionLeases(),
|
||||
request.mappingVersionOnPrimary(),
|
||||
ActionListener.wrap(
|
||||
checkpoint -> listener.onResponse(null),
|
||||
e -> {
|
||||
ActionListener.wrap(checkpoint -> listener.onResponse(null), e -> {
|
||||
// do not retry if the mapping on replica is at least as recent as the mapping
|
||||
// that the primary used to index the operations in the request.
|
||||
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
|
||||
|
@ -479,8 +473,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
|
|||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
)
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -644,10 +644,8 @@ public class RecoverySourceHandler {
|
|||
|
||||
createRetentionLeaseStep.whenComplete(retentionLease -> {
|
||||
final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();
|
||||
assert retentionLease == null
|
||||
|| retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint : retentionLease
|
||||
+ " vs "
|
||||
+ lastKnownGlobalCheckpoint;
|
||||
assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint
|
||||
: retentionLease + " vs " + lastKnownGlobalCheckpoint;
|
||||
// Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want
|
||||
// the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica
|
||||
// to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on
|
||||
|
|
|
@ -138,11 +138,8 @@ public class RecoveryState implements ToXContentFragment, Writeable {
|
|||
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) {
|
||||
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
|
||||
RecoverySource recoverySource = shardRouting.recoverySource();
|
||||
assert (recoverySource
|
||||
.getType() == RecoverySource.Type.PEER) == (sourceNode != null) : "peer recovery requires source node, recovery type: "
|
||||
+ recoverySource.getType()
|
||||
+ " source node: "
|
||||
+ sourceNode;
|
||||
assert (recoverySource.getType() == RecoverySource.Type.PEER) == (sourceNode != null)
|
||||
: "peer recovery requires source node, recovery type: " + recoverySource.getType() + " source node: " + sourceNode;
|
||||
this.shardId = shardRouting.shardId();
|
||||
this.primary = shardRouting.primary();
|
||||
this.recoverySource = recoverySource;
|
||||
|
|
|
@ -98,8 +98,8 @@ public class StartRecoveryRequest extends TransportRequest {
|
|||
this.metadataSnapshot = metadataSnapshot;
|
||||
this.primaryRelocation = primaryRelocation;
|
||||
this.startingSeqNo = startingSeqNo;
|
||||
assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
|
||||
|| metadataSnapshot.getHistoryUUID() != null : "starting seq no is set but not history uuid";
|
||||
assert startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || metadataSnapshot.getHistoryUUID() != null
|
||||
: "starting seq no is set but not history uuid";
|
||||
}
|
||||
|
||||
public long recoveryId() {
|
||||
|
|
|
@ -100,9 +100,7 @@ public final class TrackingResultProcessor implements Processor {
|
|||
+ ']'
|
||||
);
|
||||
}
|
||||
ingestDocumentCopy.executePipeline(
|
||||
pipelineToCall,
|
||||
(result, e) -> {
|
||||
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
|
||||
// special handling for pipeline cycle errors
|
||||
if (e instanceof OpenSearchException
|
||||
&& e.getCause() instanceof IllegalStateException
|
||||
|
@ -151,8 +149,7 @@ public final class TrackingResultProcessor implements Processor {
|
|||
);
|
||||
ingestDocument.executePipeline(verbosePipeline, handler);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -1059,8 +1059,8 @@ public class Node implements Closeable {
|
|||
transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
|
||||
transportService.start();
|
||||
assert localNodeFactory.getNode() != null;
|
||||
assert transportService.getLocalNode()
|
||||
.equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided";
|
||||
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
|
||||
: "transportService has a different local node than the factory provided";
|
||||
injector.getInstance(PeerRecoverySourceService.class).start();
|
||||
|
||||
// Load (and maybe upgrade) the metadata stored on disk
|
||||
|
@ -1103,8 +1103,8 @@ public class Node implements Closeable {
|
|||
// start after transport service so the local disco is known
|
||||
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
|
||||
clusterService.start();
|
||||
assert clusterService.localNode()
|
||||
.equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided";
|
||||
assert clusterService.localNode().equals(localNodeFactory.getNode())
|
||||
: "clusterService has a different local node than the factory provided";
|
||||
transportService.acceptIncomingRequests();
|
||||
discovery.startInitialJoin();
|
||||
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings());
|
||||
|
|
|
@ -66,11 +66,8 @@ public final class IndexMetaDataGenerations {
|
|||
final Map<String, String> identifiers;
|
||||
|
||||
IndexMetaDataGenerations(Map<SnapshotId, Map<IndexId, String>> lookup, Map<String, String> identifiers) {
|
||||
assert identifiers.keySet()
|
||||
.equals(lookup.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet())) : "identifier mappings "
|
||||
+ identifiers
|
||||
+ " don't track the same blob ids as the lookup map "
|
||||
+ lookup;
|
||||
assert identifiers.keySet().equals(lookup.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet()))
|
||||
: "identifier mappings " + identifiers + " don't track the same blob ids as the lookup map " + lookup;
|
||||
assert lookup.values().stream().noneMatch(Map::isEmpty) : "Lookup contained empty map [" + lookup + "]";
|
||||
this.lookup = Collections.unmodifiableMap(lookup);
|
||||
this.identifiers = Collections.unmodifiableMap(identifiers);
|
||||
|
|
|
@ -168,11 +168,8 @@ public final class RepositoryData {
|
|||
+ shardGenerations.indices()
|
||||
+ " but snapshots only reference indices "
|
||||
+ indices.values();
|
||||
assert indexSnapshots.values()
|
||||
.stream()
|
||||
.noneMatch(
|
||||
snapshotIdList -> new HashSet<>(snapshotIdList).size() != snapshotIdList.size()
|
||||
) : "Found duplicate snapshot ids per index in [" + indexSnapshots + "]";
|
||||
assert indexSnapshots.values().stream().noneMatch(snapshotIdList -> new HashSet<>(snapshotIdList).size() != snapshotIdList.size())
|
||||
: "Found duplicate snapshot ids per index in [" + indexSnapshots + "]";
|
||||
}
|
||||
|
||||
protected RepositoryData copy() {
|
||||
|
@ -355,8 +352,8 @@ public final class RepositoryData {
|
|||
+ "]";
|
||||
newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY;
|
||||
} else {
|
||||
assert indexMetaBlobs.isEmpty()
|
||||
|| shardGenerations.indices().equals(indexMetaBlobs.keySet()) : "Shard generations contained indices "
|
||||
assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet())
|
||||
: "Shard generations contained indices "
|
||||
+ shardGenerations.indices()
|
||||
+ " but indexMetaData was given for "
|
||||
+ indexMetaBlobs.keySet();
|
||||
|
|
|
@ -1909,13 +1909,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
meta.pendingGeneration()
|
||||
);
|
||||
}
|
||||
assert expectedGen == RepositoryData.EMPTY_REPO_GEN
|
||||
|| uninitializedMeta
|
||||
|| expectedGen == meta.generation() : "Expected non-empty generation ["
|
||||
+ expectedGen
|
||||
+ "] does not match generation tracked in ["
|
||||
+ meta
|
||||
+ "]";
|
||||
assert expectedGen == RepositoryData.EMPTY_REPO_GEN || uninitializedMeta || expectedGen == meta.generation()
|
||||
: "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]";
|
||||
// If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of
|
||||
// all contents by an external process so we reset the safe generation to the empty generation.
|
||||
final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN
|
||||
|
|
|
@ -448,8 +448,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
|
|||
SearchShardTask task,
|
||||
ActionListener<SearchPhaseResult> listener
|
||||
) {
|
||||
assert request.canReturnNullResponseIfMatchNoDocs() == false
|
||||
|| request.numberOfShards() > 1 : "empty responses require more than one shard";
|
||||
assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
|
||||
: "empty responses require more than one shard";
|
||||
final IndexShard shard = getShard(request);
|
||||
rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
|
||||
@Override
|
||||
|
|
|
@ -226,7 +226,8 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
|
|||
out.writeString(name);
|
||||
out.writeGenericValue(metadata);
|
||||
if (out.getVersion().before(LegacyESVersion.V_7_8_0)) {
|
||||
assert pipelineAggregatorsForBwcSerialization != null : "serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization";
|
||||
assert pipelineAggregatorsForBwcSerialization != null
|
||||
: "serializing to pre-7.8.0 versions should have called mergePipelineTreeForBWCSerialization";
|
||||
out.writeNamedWriteableList(pipelineAggregatorsForBwcSerialization);
|
||||
}
|
||||
doWriteTo(out);
|
||||
|
|
|
@ -228,8 +228,8 @@ public class AggregationPath {
|
|||
AggregationPath.PathElement token = pathElements.get(0);
|
||||
// TODO both unwrap and subAggregator are only used here!
|
||||
Aggregator aggregator = ProfilingAggregator.unwrap(root.subAggregator(token.name));
|
||||
assert (aggregator instanceof SingleBucketAggregator)
|
||||
|| (aggregator instanceof NumericMetricsAggregator) : "this should be picked up before aggregation execution - on validate";
|
||||
assert (aggregator instanceof SingleBucketAggregator) || (aggregator instanceof NumericMetricsAggregator)
|
||||
: "this should be picked up before aggregation execution - on validate";
|
||||
return aggregator;
|
||||
}
|
||||
|
||||
|
|
|
@ -96,8 +96,8 @@ public final class InFlightShardSnapshotStates {
|
|||
busyIds.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId);
|
||||
assert assertGenerationConsistency(generations, indexName, shardId, shardState.generation());
|
||||
} else if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) {
|
||||
assert busyIds.getOrDefault(indexName, Collections.emptySet())
|
||||
.contains(shardId) == false : "Can't have a successful operation queued after an in-progress operation";
|
||||
assert busyIds.getOrDefault(indexName, Collections.emptySet()).contains(shardId) == false
|
||||
: "Can't have a successful operation queued after an in-progress operation";
|
||||
generations.computeIfAbsent(indexName, k -> new HashMap<>()).put(shardId, shardState.generation());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -274,9 +274,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
final IndexId indexId = indicesMap.get(shardId.getIndexName());
|
||||
assert indexId != null;
|
||||
assert SnapshotsService.useShardGenerations(entry.version())
|
||||
|| ShardGenerations.fixShardGeneration(
|
||||
snapshotStatus.generation()
|
||||
) == null : "Found non-null, non-numeric shard generation ["
|
||||
|| ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null
|
||||
: "Found non-null, non-numeric shard generation ["
|
||||
+ snapshotStatus.generation()
|
||||
+ "] for snapshot with old-format compatibility";
|
||||
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), new ActionListener<String>() {
|
||||
|
|
|
@ -1924,8 +1924,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
assert readyDeletions(currentState)
|
||||
.v1() == currentState : "Deletes should have been set to ready by finished snapshot deletes and finalizations";
|
||||
assert readyDeletions(currentState).v1() == currentState
|
||||
: "Deletes should have been set to ready by finished snapshot deletes and finalizations";
|
||||
for (SnapshotDeletionsInProgress.Entry entry : currentState.custom(
|
||||
SnapshotDeletionsInProgress.TYPE,
|
||||
SnapshotDeletionsInProgress.EMPTY
|
||||
|
@ -2667,8 +2667,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
repositoriesService.getRepositoryData(deleteEntry.repository(), new ActionListener<RepositoryData>() {
|
||||
@Override
|
||||
public void onResponse(RepositoryData repositoryData) {
|
||||
assert repositoryData
|
||||
.getGenId() == expectedRepoGen : "Repository generation should not change as long as a ready delete is found in the cluster state but found ["
|
||||
assert repositoryData.getGenId() == expectedRepoGen
|
||||
: "Repository generation should not change as long as a ready delete is found in the cluster state but found ["
|
||||
+ expectedRepoGen
|
||||
+ "] in cluster state and ["
|
||||
+ repositoryData.getGenId()
|
||||
|
@ -2746,9 +2746,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
|
||||
assert repositoryData.getSnapshotIds()
|
||||
.stream()
|
||||
.noneMatch(deleteEntry.getSnapshots()::contains) : "Repository data contained snapshot ids "
|
||||
assert repositoryData.getSnapshotIds().stream().noneMatch(deleteEntry.getSnapshots()::contains)
|
||||
: "Repository data contained snapshot ids "
|
||||
+ repositoryData.getSnapshotIds()
|
||||
+ " that should should been deleted by ["
|
||||
+ deleteEntry
|
||||
|
@ -2866,12 +2865,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
} else {
|
||||
leaveRepoLoop(deleteEntry.repository());
|
||||
assert readyDeletions.stream()
|
||||
.noneMatch(entry -> entry.repository().equals(deleteEntry.repository())) : "New finalizations "
|
||||
+ newFinalizations
|
||||
+ " added even though deletes "
|
||||
+ readyDeletions
|
||||
+ " are ready";
|
||||
assert readyDeletions.stream().noneMatch(entry -> entry.repository().equals(deleteEntry.repository()))
|
||||
: "New finalizations " + newFinalizations + " added even though deletes " + readyDeletions + " are ready";
|
||||
for (SnapshotsInProgress.Entry entry : newFinalizations) {
|
||||
endSnapshot(entry, newState.metadata(), repositoryData);
|
||||
}
|
||||
|
@ -3837,8 +3832,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
synchronized boolean assertConsistent() {
|
||||
assert (latestKnownMetaData == null && snapshotsToFinalize.isEmpty())
|
||||
|| (latestKnownMetaData != null
|
||||
&& snapshotsToFinalize.isEmpty() == false) : "Should not hold on to metadata if there are no more queued snapshots";
|
||||
|| (latestKnownMetaData != null && snapshotsToFinalize.isEmpty() == false)
|
||||
: "Should not hold on to metadata if there are no more queued snapshots";
|
||||
assert snapshotsToFinalize.values().stream().noneMatch(Collection::isEmpty) : "Found empty queue in " + snapshotsToFinalize;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -77,11 +77,8 @@ public enum Transports {
|
|||
|
||||
public static boolean assertDefaultThreadContext(ThreadContext threadContext) {
|
||||
assert threadContext.getRequestHeadersOnly().isEmpty()
|
||||
|| threadContext.getRequestHeadersOnly().size() == 1
|
||||
&& threadContext.getRequestHeadersOnly().containsKey(Task.X_OPAQUE_ID) : "expected empty context but was "
|
||||
+ threadContext.getRequestHeadersOnly()
|
||||
+ " on "
|
||||
+ Thread.currentThread().getName();
|
||||
|| threadContext.getRequestHeadersOnly().size() == 1 && threadContext.getRequestHeadersOnly().containsKey(Task.X_OPAQUE_ID)
|
||||
: "expected empty context but was " + threadContext.getRequestHeadersOnly() + " on " + Thread.currentThread().getName();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,8 +66,8 @@ public class JavaJodaTimeDuellingTests extends OpenSearchTestCase {
|
|||
public static void checkJvmProperties() {
|
||||
boolean runtimeJdk8 = JavaVersion.current().getVersion().get(0) == 8;
|
||||
assert (runtimeJdk8 && ("SPI,JRE".equals(System.getProperty("java.locale.providers"))))
|
||||
|| (false == runtimeJdk8
|
||||
&& ("SPI,COMPAT".equals(System.getProperty("java.locale.providers")))) : "`-Djava.locale.providers` needs to be set";
|
||||
|| (false == runtimeJdk8 && ("SPI,COMPAT".equals(System.getProperty("java.locale.providers"))))
|
||||
: "`-Djava.locale.providers` needs to be set";
|
||||
assumeFalse(
|
||||
"won't work in jdk8 " + "because SPI mechanism is not looking at classpath - needs ISOCalendarDataProvider in jre's ext/libs",
|
||||
runtimeJdk8
|
||||
|
|
|
@ -311,16 +311,14 @@ public class IndexServiceTests extends OpenSearchSingleNodeTestCase {
|
|||
// before that this is why we need to wait for the refresh task to be unscheduled and the first doc to be visible
|
||||
assertTrue(refreshTask.isClosed());
|
||||
refreshTask = indexService.getRefreshTask();
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// this one either becomes visible due to a concurrently running scheduled refresh OR due to the force refresh
|
||||
// we are running on updateMetadata if the interval changes
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(1, search.totalHits.value);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
assertFalse(refreshTask.isClosed());
|
||||
// refresh every millisecond
|
||||
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
|
||||
|
@ -330,25 +328,21 @@ public class IndexServiceTests extends OpenSearchSingleNodeTestCase {
|
|||
.setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1ms"))
|
||||
.get();
|
||||
assertTrue(refreshTask.isClosed());
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// this one becomes visible due to the force refresh we are running on updateMetadata if the interval changes
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(2, search.totalHits.value);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
client().prepareIndex("test", "test", "2").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// this one becomes visible due to the scheduled refresh
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(3, search.totalHits.value);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
public void testAsyncFsyncActuallyWorks() throws Exception {
|
||||
|
|
|
@ -5837,12 +5837,10 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2));
|
||||
engine.refresh("test");
|
||||
engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID());
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// the merge listner runs concurrently after the force merge returned
|
||||
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
|
||||
}
|
||||
);
|
||||
});
|
||||
engine.flush();
|
||||
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
|
||||
}
|
||||
|
|
|
@ -209,14 +209,10 @@ public class QueryShardContextTests extends OpenSearchTestCase {
|
|||
}
|
||||
|
||||
public void testFielddataLookupSelfReference() {
|
||||
QueryShardContext queryShardContext = createQueryShardContext(
|
||||
"uuid",
|
||||
null,
|
||||
(field, leafLookup, docId) -> {
|
||||
QueryShardContext queryShardContext = createQueryShardContext("uuid", null, (field, leafLookup, docId) -> {
|
||||
// simulate a runtime field that depends on itself e.g. field: doc['field']
|
||||
return leafLookup.doc().get(field).toString();
|
||||
}
|
||||
);
|
||||
});
|
||||
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> collect("field", queryShardContext));
|
||||
assertEquals("Cyclic dependency detected while resolving runtime fields: field -> field", iae.getMessage());
|
||||
}
|
||||
|
|
|
@ -187,14 +187,12 @@ public class RetentionLeasesReplicationTests extends OpenSearchIndexLevelReplica
|
|||
}
|
||||
group.syncGlobalCheckpoint();
|
||||
group.flush();
|
||||
assertBusy(
|
||||
() -> {
|
||||
assertBusy(() -> {
|
||||
// we turn off the translog retention policy using the generic threadPool
|
||||
for (IndexShard shard : group) {
|
||||
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(0));
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1160,8 +1160,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
|
|||
equalTo(expectedLeaseIds)
|
||||
);
|
||||
|
||||
assertAsTimePasses.accept(
|
||||
() -> {
|
||||
assertAsTimePasses.accept(() -> {
|
||||
// Leases still don't expire
|
||||
assertThat(
|
||||
tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
|
||||
|
@ -1175,8 +1174,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
|
|||
tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(),
|
||||
greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis)
|
||||
);
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
|
||||
for (ShardRouting replicaShard : routingTable.replicaShards()) {
|
||||
|
@ -1188,8 +1186,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
|
|||
|
||||
tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable);
|
||||
|
||||
assertAsTimePasses.accept(
|
||||
() -> {
|
||||
assertAsTimePasses.accept(() -> {
|
||||
// Leases still don't expire
|
||||
assertThat(
|
||||
tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
|
||||
|
@ -1197,8 +1194,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
|
|||
);
|
||||
// ... and any extra peer recovery retention leases are expired immediately since the shard is fully active
|
||||
tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {}));
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
tracker.renewPeerRecoveryRetentionLeases();
|
||||
assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1());
|
||||
|
|
|
@ -131,21 +131,12 @@ public class RetentionLeaseBackgroundSyncActionTests extends OpenSearchTestCase
|
|||
);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
action.shardOperationOnPrimary(
|
||||
request,
|
||||
indexShard,
|
||||
new LatchedActionListener<>(
|
||||
ActionTestUtils.assertNoFailureListener(
|
||||
result -> {
|
||||
action.shardOperationOnPrimary(request, indexShard, new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
|
||||
// the retention leases on the shard should be persisted
|
||||
verify(indexShard).persistRetentionLeases();
|
||||
// we should forward the request containing the current retention leases to the replica
|
||||
assertThat(result.replicaRequest(), sameInstance(request));
|
||||
}
|
||||
),
|
||||
latch
|
||||
)
|
||||
);
|
||||
}), latch));
|
||||
latch.await();
|
||||
}
|
||||
|
||||
|
|
|
@ -126,20 +126,14 @@ public class RetentionLeaseSyncActionTests extends OpenSearchTestCase {
|
|||
);
|
||||
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
|
||||
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
|
||||
action.dispatchedShardOperationOnPrimary(
|
||||
request,
|
||||
indexShard,
|
||||
ActionTestUtils.assertNoFailureListener(
|
||||
result -> {
|
||||
action.dispatchedShardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> {
|
||||
// the retention leases on the shard should be persisted
|
||||
verify(indexShard).persistRetentionLeases();
|
||||
// we should forward the request containing the current retention leases to the replica
|
||||
assertThat(result.replicaRequest(), sameInstance(request));
|
||||
// we should start with an empty replication response
|
||||
assertNull(result.finalResponseIfSuccessful.getShardInfo());
|
||||
}
|
||||
)
|
||||
);
|
||||
}));
|
||||
}
|
||||
|
||||
public void testRetentionLeaseSyncActionOnReplica() throws Exception {
|
||||
|
|
|
@ -208,10 +208,7 @@ public class SumAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
|
||||
private void verifySummationOfDoubles(double[] values, double expected, double delta) throws IOException {
|
||||
testAggregation(
|
||||
sum("_name").field(FIELD_NAME),
|
||||
new MatchAllDocsQuery(),
|
||||
iw -> {
|
||||
testAggregation(sum("_name").field(FIELD_NAME), new MatchAllDocsQuery(), iw -> {
|
||||
/*
|
||||
* The sum agg uses a Kahan sumation on the shard to limit
|
||||
* floating point errors. But it doesn't ship the sums to the
|
||||
|
@ -229,10 +226,7 @@ public class SumAggregatorTests extends AggregatorTestCase {
|
|||
.mapToObj(value -> singleton(new NumericDocValuesField(FIELD_NAME, NumericUtils.doubleToSortableLong(value))))
|
||||
.collect(toList())
|
||||
);
|
||||
},
|
||||
result -> assertEquals(expected, result.getValue(), delta),
|
||||
defaultFieldType(NumberType.DOUBLE)
|
||||
);
|
||||
}, result -> assertEquals(expected, result.getValue(), delta), defaultFieldType(NumberType.DOUBLE));
|
||||
}
|
||||
|
||||
public void testUnmapped() throws IOException {
|
||||
|
|
|
@ -80,10 +80,8 @@ public abstract class OpenSearchAllocationTestCase extends OpenSearchTestCase {
|
|||
) {
|
||||
@Override
|
||||
public Long getShardSize(ShardRouting shardRouting) {
|
||||
assert shardRouting.recoverySource()
|
||||
.getType() == RecoverySource.Type.SNAPSHOT : "Expecting a recovery source of type [SNAPSHOT] but got ["
|
||||
+ shardRouting.recoverySource().getType()
|
||||
+ ']';
|
||||
assert shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT
|
||||
: "Expecting a recovery source of type [SNAPSHOT] but got [" + shardRouting.recoverySource().getType() + ']';
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -351,10 +351,8 @@ public abstract class OpenSearchIndexLevelReplicationTestCase extends IndexShard
|
|||
}
|
||||
|
||||
public synchronized void addReplica(IndexShard replica) throws IOException {
|
||||
assert shardRoutings().stream()
|
||||
.anyMatch(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())) == false : "replica with aId ["
|
||||
+ replica.routingEntry().allocationId()
|
||||
+ "] already exists";
|
||||
assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())) == false
|
||||
: "replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
|
||||
replicas.add(replica);
|
||||
if (replicationTargets != null) {
|
||||
replicationTargets.addReplica(replica);
|
||||
|
|
|
@ -454,9 +454,8 @@ public final class InternalTestCluster extends TestCluster {
|
|||
* It's only possible to change {@link #bootstrapMasterNodeIndex} value if autoManageMasterNodes is false.
|
||||
*/
|
||||
public void setBootstrapMasterNodeIndex(int bootstrapMasterNodeIndex) {
|
||||
assert autoManageMasterNodes == false
|
||||
|| bootstrapMasterNodeIndex == -1 : "bootstrapMasterNodeIndex should be -1 if autoManageMasterNodes is true, but was "
|
||||
+ bootstrapMasterNodeIndex;
|
||||
assert autoManageMasterNodes == false || bootstrapMasterNodeIndex == -1
|
||||
: "bootstrapMasterNodeIndex should be -1 if autoManageMasterNodes is true, but was " + bootstrapMasterNodeIndex;
|
||||
this.bootstrapMasterNodeIndex = bootstrapMasterNodeIndex;
|
||||
}
|
||||
|
||||
|
|
|
@ -284,8 +284,8 @@ public final class XContentTestUtils {
|
|||
* </ul>
|
||||
*/
|
||||
static List<String> getInsertPaths(XContentParser parser, Stack<String> currentPath) throws IOException {
|
||||
assert parser.currentToken() == XContentParser.Token.START_OBJECT
|
||||
|| parser.currentToken() == XContentParser.Token.START_ARRAY : "should only be called when new objects or arrays start";
|
||||
assert parser.currentToken() == XContentParser.Token.START_OBJECT || parser.currentToken() == XContentParser.Token.START_ARRAY
|
||||
: "should only be called when new objects or arrays start";
|
||||
List<String> validPaths = new ArrayList<>();
|
||||
// parser.currentName() can be null for root object and unnamed objects in arrays
|
||||
if (parser.currentName() != null) {
|
||||
|
|
|
@ -2485,11 +2485,7 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
|
|||
MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
|
||||
CountDownLatch receivedLatch = new CountDownLatch(1);
|
||||
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
||||
serviceC.registerRequestHandler(
|
||||
"internal:action",
|
||||
ThreadPool.Names.SAME,
|
||||
TestRequest::new,
|
||||
(request, channel, task) -> {
|
||||
serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
||||
// don't block on a network thread here
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
|
@ -2508,8 +2504,7 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
|
|||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
serviceC.start();
|
||||
serviceC.acceptIncomingRequests();
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
|
@ -2564,11 +2559,7 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
|
|||
MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
|
||||
CountDownLatch receivedLatch = new CountDownLatch(1);
|
||||
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
||||
serviceB.registerRequestHandler(
|
||||
"internal:action",
|
||||
ThreadPool.Names.SAME,
|
||||
TestRequest::new,
|
||||
(request, channel, task) -> {
|
||||
serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
||||
// don't block on a network thread here
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
|
@ -2587,8 +2578,7 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
|
|||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
serviceC.start();
|
||||
serviceC.acceptIncomingRequests();
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
|
@ -2688,11 +2678,7 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
|
|||
CountDownLatch sendResponseLatch = new CountDownLatch(1);
|
||||
Exception ex = new RuntimeException("boom");
|
||||
ex.setStackTrace(new StackTraceElement[0]);
|
||||
serviceB.registerRequestHandler(
|
||||
"internal:action",
|
||||
ThreadPool.Names.SAME,
|
||||
TestRequest::new,
|
||||
(request, channel, task) -> {
|
||||
serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
|
||||
// don't block on a network thread here
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
|
@ -2711,8 +2697,7 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
|
|||
onFailure(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
serviceC.start();
|
||||
serviceC.acceptIncomingRequests();
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
|
|
Loading…
Reference in New Issue