Fix compaction status API response (#17006)

Description:
#16768 introduces new compaction APIs on the Overlord `/compact/status` and `/compact/progress`.
But the corresponding `OverlordClient` methods do not return an object compatible with the actual
endpoints defined in `OverlordCompactionResource`.

This patch ensures that the objects are compatible.

Changes:
- Add `CompactionStatusResponse` and `CompactionProgressResponse`
- Use these as the return type in `OverlordClient` methods and as the response entity in `OverlordCompactionResource`
- Add `SupervisorCleanupModule` bound on the Coordinator to perform cleanup of supervisors.
Without this module, Coordinator cannot deserialize compaction supervisors.
This commit is contained in:
Kashif Faraz 2024-09-05 10:52:01 -07:00 committed by GitHub
parent b4d83a86c2
commit ba6f804f48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 528 additions and 93 deletions

View File

@ -45,7 +45,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -238,13 +239,13 @@ class LocalOverlordClient implements OverlordClient
}
@Override
public ListenableFuture<List<AutoCompactionSnapshot>> getCompactionSnapshots(@Nullable String dataSource)
public ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<Long> getBytesAwaitingCompaction(String dataSource)
public ListenableFuture<CompactionProgressResponse> getBytesAwaitingCompaction(String dataSource)
{
throw new UnsupportedOperationException();
}

View File

@ -19,13 +19,18 @@
package org.apache.druid.indexing.overlord.http;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
import org.apache.druid.indexing.compact.CompactionScheduler;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CompactionSupervisorConfig;
import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.http.security.StateResourceFilter;
import javax.ws.rs.Consumes;
@ -40,8 +45,8 @@ import java.util.Collection;
import java.util.Collections;
/**
* Contains the same logic as {@code CompactionResource} but the APIs are served
* by {@link CompactionScheduler} instead of {@code DruidCoordinator}.
* Contains the same logic as {@code CoordinatorCompactionResource} but the APIs
* are served by {@link CompactionScheduler} instead of {@code DruidCoordinator}.
*/
@Path("/druid/indexer/v1/compaction")
public class OverlordCompactionResource
@ -81,18 +86,14 @@ public class OverlordCompactionResource
}
if (dataSource == null || dataSource.isEmpty()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(Collections.singletonMap("error", "No DataSource specified"))
.build();
return ServletResourceUtils.buildErrorResponseFrom(InvalidInput.exception("No DataSource specified"));
}
final AutoCompactionSnapshot snapshot = scheduler.getCompactionSnapshot(dataSource);
if (snapshot == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity(Collections.singletonMap("error", "Unknown DataSource"))
.build();
return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
} else {
return Response.ok(Collections.singletonMap("remainingSegmentSize", snapshot.getBytesAwaitingCompaction()))
return Response.ok(new CompactionProgressResponse(snapshot.getBytesAwaitingCompaction()))
.build();
}
}
@ -115,13 +116,11 @@ public class OverlordCompactionResource
} else {
AutoCompactionSnapshot autoCompactionSnapshot = scheduler.getCompactionSnapshot(dataSource);
if (autoCompactionSnapshot == null) {
return Response.status(Response.Status.NOT_FOUND)
.entity(Collections.singletonMap("error", "Unknown DataSource"))
.build();
return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
}
snapshots = Collections.singleton(autoCompactionSnapshot);
}
return Response.ok(Collections.singletonMap("latestStatus", snapshots)).build();
return Response.ok(new CompactionStatusResponse(snapshots)).build();
}
@POST
@ -139,12 +138,12 @@ public class OverlordCompactionResource
private Response buildErrorResponseIfSchedulerDisabled()
{
return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(
ImmutableMap.of(
"error",
"Compaction Supervisors are disabled on the Overlord."
+ " Use Coordinator APIs to fetch compaction status."
)
).build();
final String msg = "Compaction Supervisors are disabled on the Overlord."
+ " Use Coordinator APIs to fetch compaction status.";
return ServletResourceUtils.buildErrorResponseFrom(
DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.UNSUPPORTED)
.build(msg)
);
}
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.indexing.compact.CompactionScheduler;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionSupervisorConfig;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Map;
public class OverlordCompactionResourceTest
{
private static final CompactionSupervisorConfig SUPERVISOR_ENABLED
= new CompactionSupervisorConfig(true);
private static final CompactionSupervisorConfig SUPERVISOR_DISABLED
= new CompactionSupervisorConfig(false);
private CompactionScheduler scheduler;
@Before
public void setUp()
{
scheduler = EasyMock.createStrictMock(CompactionScheduler.class);
}
@After
public void tearDown()
{
EasyMock.verify(scheduler);
}
@Test
public void testGetCompactionSnapshotWithEmptyDatasource()
{
final Map<String, AutoCompactionSnapshot> allSnapshots = ImmutableMap.of(
TestDataSource.WIKI,
AutoCompactionSnapshot.builder(TestDataSource.WIKI).build()
);
EasyMock.expect(scheduler.getAllCompactionSnapshots())
.andReturn(allSnapshots).once();
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionSnapshots("");
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(
new CompactionStatusResponse(allSnapshots.values()),
response.getEntity()
);
}
@Test
public void testGetCompactionSnapshotWithNullDatasource()
{
final Map<String, AutoCompactionSnapshot> allSnapshots = ImmutableMap.of(
TestDataSource.WIKI,
AutoCompactionSnapshot.builder(TestDataSource.WIKI).build()
);
EasyMock.expect(scheduler.getAllCompactionSnapshots())
.andReturn(allSnapshots).once();
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionSnapshots(null);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(
new CompactionStatusResponse(allSnapshots.values()),
response.getEntity()
);
}
@Test
public void testGetCompactionSnapshotWithValidDatasource()
{
final AutoCompactionSnapshot snapshot = AutoCompactionSnapshot.builder(TestDataSource.WIKI).build();
EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.WIKI))
.andReturn(snapshot).once();
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionSnapshots(TestDataSource.WIKI);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(
new CompactionStatusResponse(Collections.singleton(snapshot)),
response.getEntity()
);
}
@Test
public void testGetCompactionSnapshotWithInvalidDatasource()
{
EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.KOALA))
.andReturn(null).once();
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionSnapshots(TestDataSource.KOALA);
Assert.assertEquals(404, response.getStatus());
}
@Test
public void testGetProgressForValidDatasource()
{
final AutoCompactionSnapshot.Builder snapshotBuilder
= AutoCompactionSnapshot.builder(TestDataSource.WIKI);
snapshotBuilder.incrementWaitingStats(CompactionStatistics.create(100L, 10L, 1L));
final AutoCompactionSnapshot snapshot = snapshotBuilder.build();
EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.WIKI))
.andReturn(snapshot).once();
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionProgress(TestDataSource.WIKI);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(new CompactionProgressResponse(100L), response.getEntity());
}
@Test
public void testGetProgressForNullDatasourceReturnsBadRequest()
{
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionProgress(null);
Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
final Object responseEntity = response.getEntity();
Assert.assertTrue(responseEntity instanceof ErrorResponse);
MatcherAssert.assertThat(
((ErrorResponse) responseEntity).getUnderlyingException(),
DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified")
);
}
@Test
public void testGetProgressForInvalidDatasourceReturnsNotFound()
{
EasyMock.expect(scheduler.getCompactionSnapshot(TestDataSource.KOALA))
.andReturn(null).once();
EasyMock.replay(scheduler);
final Response response = new OverlordCompactionResource(SUPERVISOR_ENABLED, scheduler)
.getCompactionProgress(TestDataSource.KOALA);
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus());
final Object responseEntity = response.getEntity();
Assert.assertTrue(responseEntity instanceof ErrorResponse);
MatcherAssert.assertThat(
((ErrorResponse) responseEntity).getUnderlyingException(),
DruidExceptionMatcher.notFound().expectMessageIs("Unknown DataSource")
);
}
@Test
public void testGetProgressReturnsUnsupportedWhenSupervisorDisabled()
{
EasyMock.replay(scheduler);
verifyResponseWhenSupervisorDisabled(
new OverlordCompactionResource(SUPERVISOR_DISABLED, scheduler)
.getCompactionProgress(TestDataSource.WIKI)
);
}
@Test
public void testGetSnapshotReturnsUnsupportedWhenSupervisorDisabled()
{
EasyMock.replay(scheduler);
verifyResponseWhenSupervisorDisabled(
new OverlordCompactionResource(SUPERVISOR_DISABLED, scheduler)
.getCompactionSnapshots(TestDataSource.WIKI)
);
}
private void verifyResponseWhenSupervisorDisabled(Response response)
{
Assert.assertEquals(501, response.getStatus());
final Object responseEntity = response.getEntity();
Assert.assertTrue(responseEntity instanceof ErrorResponse);
MatcherAssert.assertThat(
((ErrorResponse) responseEntity).getUnderlyingException(),
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.UNSUPPORTED,
"general"
).expectMessageIs(
"Compaction Supervisors are disabled on the Overlord."
+ " Use Coordinator APIs to fetch compaction status."
)
);
}
}

View File

@ -40,6 +40,15 @@ public class DruidExceptionMatcher extends DiagnosingMatcher<Throwable>
);
}
public static DruidExceptionMatcher notFound()
{
return new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.NOT_FOUND,
"notFound"
);
}
public static DruidExceptionMatcher invalidSqlInput()
{
return invalidInput().expectContext("sourceType", "sql");

View File

@ -35,7 +35,8 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -226,7 +227,7 @@ public interface OverlordClient
* <p>
* API: {@code /druid/indexer/v1/compaction/progress}
*/
ListenableFuture<Long> getBytesAwaitingCompaction(String dataSource);
ListenableFuture<CompactionProgressResponse> getBytesAwaitingCompaction(String dataSource);
/**
* Gets the latest compaction snapshots of one or all datasources.
@ -236,7 +237,7 @@ public interface OverlordClient
* @param dataSource If passed as non-null, then the returned list contains only
* the snapshot for this datasource.
*/
ListenableFuture<List<AutoCompactionSnapshot>> getCompactionSnapshots(@Nullable String dataSource);
ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource);
/**
* Returns a copy of this client with a different retry policy.

View File

@ -45,7 +45,8 @@ import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
@ -314,7 +315,7 @@ public class OverlordClientImpl implements OverlordClient
}
@Override
public ListenableFuture<List<AutoCompactionSnapshot>> getCompactionSnapshots(@Nullable String dataSource)
public ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource)
{
final StringBuilder pathBuilder = new StringBuilder("/druid/indexer/v1/compaction/status");
if (dataSource != null && !dataSource.isEmpty()) {
@ -329,13 +330,13 @@ public class OverlordClientImpl implements OverlordClient
holder -> JacksonUtils.readValue(
jsonMapper,
holder.getContent(),
new TypeReference<List<AutoCompactionSnapshot>>() {}
CompactionStatusResponse.class
)
);
}
@Override
public ListenableFuture<Long> getBytesAwaitingCompaction(String dataSource)
public ListenableFuture<CompactionProgressResponse> getBytesAwaitingCompaction(String dataSource)
{
final String path = "/druid/indexer/v1/compaction/progress?dataSource=" + dataSource;
return FutureUtils.transform(
@ -343,7 +344,7 @@ public class OverlordClientImpl implements OverlordClient
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), Long.class)
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), CompactionProgressResponse.class)
);
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.compaction;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* Response of {@code /compaction/progress} API exposed by Coordinator and
* Overlord (when compaction supervisors are enabled).
*/
public class CompactionProgressResponse
{
private final long remainingSegmentSize;
@JsonCreator
public CompactionProgressResponse(
@JsonProperty("remainingSegmentSize") long remainingSegmentSize
)
{
this.remainingSegmentSize = remainingSegmentSize;
}
@JsonProperty
public long getRemainingSegmentSize()
{
return remainingSegmentSize;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionProgressResponse that = (CompactionProgressResponse) o;
return remainingSegmentSize == that.remainingSegmentSize;
}
@Override
public int hashCode()
{
return Objects.hashCode(remainingSegmentSize);
}
}

View File

@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
@ -109,7 +108,9 @@ public class CompactionRunSimulator
)
{
final CompactionStatus status = candidateSegments.getCurrentStatus();
if (status.getState() == CompactionStatus.State.COMPLETE) {
if (status == null) {
// do nothing
} else if (status.getState() == CompactionStatus.State.COMPLETE) {
compactedIntervals.addRow(
createRow(candidateSegments, null, null)
);
@ -130,7 +131,7 @@ public class CompactionRunSimulator
// Add a row for each task in order of submission
final CompactionStatus status = candidateSegments.getCurrentStatus();
queuedIntervals.addRow(
createRow(candidateSegments, taskPayload.getTuningConfig(), status.getReason())
createRow(candidateSegments, taskPayload.getTuningConfig(), status == null ? "" : status.getReason())
);
}
};
@ -285,13 +286,13 @@ public class CompactionRunSimulator
}
@Override
public ListenableFuture<List<AutoCompactionSnapshot>> getCompactionSnapshots(@Nullable String dataSource)
public ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<Long> getBytesAwaitingCompaction(String dataSource)
public ListenableFuture<CompactionProgressResponse> getBytesAwaitingCompaction(String dataSource)
{
throw new UnsupportedOperationException();
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.compaction;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import java.util.Collection;
import java.util.Objects;
/**
* Response of {@code /compaction/status} API exposed by Coordinator and
* Overlord (when compaction supervisors are enabled).
*/
public class CompactionStatusResponse
{
private final Collection<AutoCompactionSnapshot> latestStatus;
@JsonCreator
public CompactionStatusResponse(
@JsonProperty("latestStatus") Collection<AutoCompactionSnapshot> latestStatus
)
{
this.latestStatus = latestStatus;
}
@JsonProperty
public Collection<AutoCompactionSnapshot> getLatestStatus()
{
return latestStatus;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionStatusResponse that = (CompactionStatusResponse) o;
return Objects.equals(latestStatus, that.latestStatus);
}
@Override
public int hashCode()
{
return Objects.hashCode(latestStatus);
}
}

View File

@ -21,13 +21,17 @@ package org.apache.druid.server.http;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinator;
@ -82,9 +86,7 @@ public class CoordinatorCompactionResource
)
{
if (dataSource == null || dataSource.isEmpty()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", "No DataSource specified"))
.build();
return ServletResourceUtils.buildErrorResponseFrom(InvalidInput.exception("No DataSource specified"));
}
if (isCompactionSupervisorEnabled()) {
@ -93,9 +95,9 @@ public class CoordinatorCompactionResource
final AutoCompactionSnapshot snapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
if (snapshot == null) {
return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "Unknown DataSource")).build();
return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
} else {
return Response.ok(ImmutableMap.of("remainingSegmentSize", snapshot.getBytesAwaitingCompaction())).build();
return Response.ok(new CompactionProgressResponse(snapshot.getBytesAwaitingCompaction())).build();
}
}
@ -117,11 +119,11 @@ public class CoordinatorCompactionResource
} else {
AutoCompactionSnapshot autoCompactionSnapshot = coordinator.getAutoCompactionSnapshotForDataSource(dataSource);
if (autoCompactionSnapshot == null) {
return Response.status(Response.Status.NOT_FOUND).entity(ImmutableMap.of("error", "Unknown DataSource")).build();
return ServletResourceUtils.buildErrorResponseFrom(NotFound.exception("Unknown DataSource"));
}
snapshots = ImmutableList.of(autoCompactionSnapshot);
}
return Response.ok(ImmutableMap.of("latestStatus", snapshots)).build();
return Response.ok(new CompactionStatusResponse(snapshots)).build();
}
@POST
@ -149,9 +151,9 @@ public class CoordinatorCompactionResource
.entity(cause.getResponse().getContent())
.build();
} else {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
return ServletResourceUtils.buildErrorResponseFrom(
InternalServerError.exception(e.getMessage())
);
}
}
}

View File

@ -28,7 +28,8 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -124,13 +125,13 @@ public class NoopOverlordClient implements OverlordClient
}
@Override
public ListenableFuture<List<AutoCompactionSnapshot>> getCompactionSnapshots(@Nullable String dataSource)
public ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource)
{
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<Long> getBytesAwaitingCompaction(String dataSource)
public ListenableFuture<CompactionProgressResponse> getBytesAwaitingCompaction(String dataSource)
{
throw new UnsupportedOperationException();
}

View File

@ -47,6 +47,8 @@ import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
@ -477,18 +479,18 @@ public class OverlordClientImplTest
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
.build(),
AutoCompactionSnapshot.builder("ds2")
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.build()
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/status"),
HttpResponseStatus.OK,
Collections.emptyMap(),
DefaultObjectMapper.INSTANCE.writeValueAsBytes(compactionSnapshots)
DefaultObjectMapper.INSTANCE.writeValueAsBytes(new CompactionStatusResponse(compactionSnapshots))
);
Assert.assertEquals(
compactionSnapshots,
new CompactionStatusResponse(compactionSnapshots),
overlordClient.getCompactionSnapshots(null).get()
);
}
@ -498,19 +500,17 @@ public class OverlordClientImplTest
throws JsonProcessingException, ExecutionException, InterruptedException
{
final List<AutoCompactionSnapshot> compactionSnapshots = Collections.singletonList(
AutoCompactionSnapshot.builder("ds1")
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
.build()
AutoCompactionSnapshot.builder("ds1").build()
);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/status?dataSource=ds1"),
HttpResponseStatus.OK,
Collections.emptyMap(),
DefaultObjectMapper.INSTANCE.writeValueAsBytes(compactionSnapshots)
DefaultObjectMapper.INSTANCE.writeValueAsBytes(new CompactionStatusResponse(compactionSnapshots))
);
Assert.assertEquals(
compactionSnapshots,
new CompactionStatusResponse(compactionSnapshots),
overlordClient.getCompactionSnapshots("ds1").get()
);
}
@ -523,12 +523,12 @@ public class OverlordClientImplTest
new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/compaction/progress?dataSource=ds1"),
HttpResponseStatus.OK,
Collections.emptyMap(),
DefaultObjectMapper.INSTANCE.writeValueAsBytes(100_000L)
DefaultObjectMapper.INSTANCE.writeValueAsBytes(new CompactionProgressResponse(100_000L))
);
Assert.assertEquals(
100_000L,
overlordClient.getBytesAwaitingCompaction("ds1").get().longValue()
new CompactionProgressResponse(100_000L),
overlordClient.getBytesAwaitingCompaction("ds1").get()
);
}
}

View File

@ -19,16 +19,19 @@
package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -37,7 +40,6 @@ import org.junit.Test;
import javax.annotation.Nullable;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CoordinatorCompactionResourceTest
@ -92,7 +94,7 @@ public class CoordinatorCompactionResourceTest
final Response response = new CoordinatorCompactionResource(mock, overlordClient)
.getCompactionSnapshotForDataSource("");
Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
Assert.assertEquals(new CompactionStatusResponse(expected.values()), response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@ -110,7 +112,7 @@ public class CoordinatorCompactionResourceTest
final Response response = new CoordinatorCompactionResource(mock, overlordClient)
.getCompactionSnapshotForDataSource(null);
Assert.assertEquals(ImmutableMap.of("latestStatus", expected.values()), response.getEntity());
Assert.assertEquals(new CompactionStatusResponse(expected.values()), response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@ -119,12 +121,16 @@ public class CoordinatorCompactionResourceTest
{
String dataSourceName = "datasource_1";
EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(expectedSnapshot).once();
EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName))
.andReturn(expectedSnapshot).once();
EasyMock.replay(mock);
final Response response = new CoordinatorCompactionResource(mock, overlordClient)
.getCompactionSnapshotForDataSource(dataSourceName);
Assert.assertEquals(ImmutableMap.of("latestStatus", ImmutableList.of(expectedSnapshot)), response.getEntity());
Assert.assertEquals(
new CompactionStatusResponse(Collections.singletonList(expectedSnapshot)),
response.getEntity()
);
Assert.assertEquals(200, response.getStatus());
}
@ -133,7 +139,8 @@ public class CoordinatorCompactionResourceTest
{
String dataSourceName = "invalid_datasource";
EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName)).andReturn(null).once();
EasyMock.expect(mock.getAutoCompactionSnapshotForDataSource(dataSourceName))
.andReturn(null).once();
EasyMock.replay(mock);
final Response response = new CoordinatorCompactionResource(mock, overlordClient)
@ -149,14 +156,18 @@ public class CoordinatorCompactionResourceTest
final Response response = new CoordinatorCompactionResource(mock, overlordClient)
.getCompactionProgress(null);
Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
Assert.assertEquals(
ImmutableMap.of("error", "No DataSource specified"),
response.getEntity()
final Object responseEntity = response.getEntity();
Assert.assertTrue(responseEntity instanceof ErrorResponse);
MatcherAssert.assertThat(
((ErrorResponse) responseEntity).getUnderlyingException(),
DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified")
);
}
@Test
public void testGetSnapshotWhenCompactionSupervisorIsEnabled()
public void testGetSnapshotRedirectsToOverlordWhenSupervisorIsEnabled()
{
EasyMock.replay(mock);
@ -172,9 +183,11 @@ public class CoordinatorCompactionResourceTest
}
@Override
public ListenableFuture<List<AutoCompactionSnapshot>> getCompactionSnapshots(@Nullable String dataSource)
public ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource)
{
return Futures.immediateFuture(Collections.singletonList(snapshotFromOverlord));
return Futures.immediateFuture(
new CompactionStatusResponse(Collections.singletonList(snapshotFromOverlord))
);
}
};
@ -182,7 +195,7 @@ public class CoordinatorCompactionResourceTest
.getCompactionSnapshotForDataSource(dataSourceName);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(
Collections.singletonList(snapshotFromOverlord),
new CompactionStatusResponse(Collections.singletonList(snapshotFromOverlord)),
response.getEntity()
);
}

View File

@ -51,11 +51,10 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SupervisorCleanupModule;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -89,7 +88,6 @@ import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.SegmentMetadataCacheConfig;
@ -276,16 +274,6 @@ public class CliCoordinator extends ServerRunnable
LifecycleModule.register(binder, Server.class);
LifecycleModule.register(binder, DataSourcesResource.class);
if (properties.containsKey("druid.coordinator.merge.on")) {
throw new UnsupportedOperationException(
"'druid.coordinator.merge.on' is not supported anymore. "
+ "Please consider using Coordinator's automatic compaction instead. "
+ "See https://druid.apache.org/docs/latest/operations/segment-optimization.html and "
+ "https://druid.apache.org/docs/latest/api-reference/api-reference.html#compaction-configuration "
+ "for more details about compaction."
);
}
bindAnnouncer(
binder,
Coordinator.class,
@ -296,10 +284,6 @@ public class CliCoordinator extends ServerRunnable
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
if (!beOverlord) {
// These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup
binder.bind(TaskStorage.class).toProvider(Providers.of(null));
binder.bind(TaskMaster.class).toProvider(Providers.of(null));
binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
// Bind HeartbeatSupplier only when the service operates independently of Overlord.
binder.bind(new TypeLiteral<Supplier<Map<String, Object>>>() {})
.annotatedWith(Names.named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING))
@ -342,6 +326,7 @@ public class CliCoordinator extends ServerRunnable
// Only add LookupSerdeModule if !beOverlord, since CliOverlord includes it, and having two copies causes
// the injector to get confused due to having multiple bindings for the same classes.
modules.add(new LookupSerdeModule());
modules.add(new SupervisorCleanupModule());
}
return modules;

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import com.google.inject.util.Providers;
import org.apache.druid.indexing.compact.CompactionScheduler;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import java.util.List;
/**
* Contains bindings necessary for Coordinator to perform supervisor cleanup
* when Coordinator and Overlord are running as separate processes.
*/
public class SupervisorCleanupModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
// These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup
binder.bind(TaskStorage.class).toProvider(Providers.of(null));
binder.bind(TaskMaster.class).toProvider(Providers.of(null));
binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
binder.bind(CompactionScheduler.class).toProvider(Providers.of(null));
}
@Override
public List<? extends Module> getJacksonModules()
{
return new SupervisorModule().getJacksonModules();
}
}