Support for bootstrap segments (#16609)

* Initial support for bootstrap segments.

  - Adds a new API in the coordinator.
  - All processes that have storage locations configured (including tasks)
    talk to the coordinator if they can, and fetch bootstrap segments from it.
  - Then load the segments onto the segment cache as part of startup.
  - This addresses the segment bootstrapping logic required by processes before
    they can start serving queries or ingesting.

    This patch also lays the foundation to speed up upgrades.

* Fail open by default if there are any errors talking to the coordinator.

* Add test for failure scenario and cleanup logs.

* Cleanup and add debug log

* Assert the events so we know the list exactly.

* Revert RunRules test.

The rules aren't evaluated if there are no clusters.

* Revert RunRulesTest too.

* Remove debug info.

* Make the API POST and update log.

* Fix up UTs.

* Throw 503 from MetadataResource; clean up exception handling and DruidException.

* Remove unused logger, add verification of metrics and docs.

* Update error message

* Update server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Apply suggestions from code review

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Adjust test metric expectations with the rename.

* Add BootstrapSegmentResponse container in the response for future extensibility.

* Rename to BootstrapSegmentsInfo for internal consistency.

* Remove unused log.

* Use a member variable for broadcast segments instead of segmentAssigner.

* Minor cleanup

* Add test for loadable bootstrap segments and clarify comment.

* Review suggestions.

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
This commit is contained in:
Abhishek Radhakrishnan 2024-06-24 09:27:17 -07:00 committed by GitHub
parent 354a3bea0b
commit 7463589b07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 509 additions and 45 deletions

View File

@ -176,7 +176,11 @@ Returns a list of all segments, overlapping with any of given intervals, for a d
`POST /druid/coordinator/v1/metadata/dataSourceInformation`
Returns information about the specified datasources, including the datasource schema.
Returns information about the specified datasources, including the datasource schema.
`POST /druid/coordinator/v1/metadata/bootstrapSegments`
Returns information about bootstrap segments for all datasources. The returned set includes all broadcast segments if broadcast rules are configured.
<a name="coordinator-datasources"></a>

View File

@ -331,19 +331,19 @@ public class DruidException extends RuntimeException
}
/**
* Category of error. The simplest way to describe this is that it exists as a classification of errors that
* Category of error. The simplest way to describe this is that it exists as a classification of errors that
* enables us to identify the expected response code (e.g. HTTP status code) of a specific DruidException
*/
public enum Category
{
/**
* Means that the exception is being created defensively, because we want to validate something but expect that
* it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* developers that the case being checked is not intended to actually be able to occur in the wild.
*/
DEFENSIVE(500),
/**
* Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* category have messages written either targeting the USER or ADMIN personas as those are the general users
* of the APIs who could generate invalid inputs.
*/
@ -356,9 +356,8 @@ public class DruidException extends RuntimeException
* Means that an action that was attempted is forbidden
*/
FORBIDDEN(403),
/**
* Means that the requsted requested resource cannot be found.
* Means that the requested resource cannot be found.
*/
NOT_FOUND(404),
/**

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.timeline.DataSegment;
public class BootstrapSegmentsResponse
{
private final CloseableIterator<DataSegment> iterator;
public BootstrapSegmentsResponse(final CloseableIterator<DataSegment> iterator)
{
this.iterator = iterator;
}
public CloseableIterator<DataSegment> getIterator()
{
return iterator;
}
}

View File

@ -86,6 +86,16 @@ public class JsonParserIterator<T> implements CloseableIterator<T>
this.hasTimeout = timeoutAt > -1;
}
/**
* Bypasses Jackson serialization to prevent materialization of results from the {@code future} in memory at once.
* A shortened version of {@link #JsonParserIterator(JavaType, Future, String, Query, String, ObjectMapper)}
* where the URL and host parameters, used solely for logging/errors, are not known.
*/
public JsonParserIterator(JavaType typeRef, Future<InputStream> future, ObjectMapper objectMapper)
{
this(typeRef, future, "", null, "", objectMapper);
}
@Override
public boolean hasNext()
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.client.coordinator;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
@ -58,6 +59,12 @@ public interface CoordinatorClient
*/
ListenableFuture<List<DataSourceInformation>> fetchDataSourceInformation(Set<String> datasources);
/**
* Fetch bootstrap segments from the coordinator. The results must be streamed back to the caller as the
* result set can be large.
*/
ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments();
/**
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/

View File

@ -21,17 +21,22 @@ package org.apache.druid.client.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.coordination.LoadableDataSegment;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
@ -156,6 +161,28 @@ public class CoordinatorClientImpl implements CoordinatorClient
);
}
@Override
public ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments()
{
final String path = "/druid/coordinator/v1/metadata/bootstrapSegments";
return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.POST, path),
new InputStreamResponseHandler()
),
in -> new BootstrapSegmentsResponse(
new JsonParserIterator<>(
// Some servers, like the Broker, may have PruneLoadSpec set to true for optimization reasons.
// We specifically use LoadableDataSegment here instead of DataSegment so the callers can still correctly
// load the bootstrap segments, as the load specs are guaranteed not to be pruned.
jsonMapper.getTypeFactory().constructType(LoadableDataSegment.class),
Futures.immediateFuture(in),
jsonMapper
)
)
);
}
@Override
public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{

View File

@ -323,9 +323,6 @@ public class OverlordClientImpl implements OverlordClient
return new JsonParserIterator<>(
jsonMapper.getTypeFactory().constructType(clazz),
Futures.immediateFuture(in),
"", // We don't know URL at this point, but it's OK to use empty; it's used for logs/errors
null,
"", // We don't know host at this point, but it's OK to use empty; it's used for logs/errors
jsonMapper
);
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.segment.loading;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.io.Files;
import org.apache.druid.java.util.common.FileUtils;
@ -117,12 +116,6 @@ public class LocalDataSegmentPuller implements URIDataPuller
private static final Logger log = new Logger(LocalDataSegmentPuller.class);
@VisibleForTesting
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
{
getSegmentFiles(getFile(segment), dir);
}
public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@ -59,8 +58,7 @@ public class LoadableDataSegment extends DataSegment
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JacksonInject PruneSpecsHolder pruneSpecsHolder
@JsonProperty("size") long size
)
{
super(

View File

@ -29,6 +29,9 @@ import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.ISE;
@ -37,6 +40,8 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
@ -83,6 +88,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ServerTypeConfig serverTypeConfig;
private final CoordinatorClient coordinatorClient;
private final ServiceEmitter emitter;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private volatile boolean started = false;
@ -103,7 +110,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ServerTypeConfig serverTypeConfig
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter
)
{
this(
@ -115,7 +124,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
),
serverTypeConfig
serverTypeConfig,
coordinatorClient,
emitter
);
}
@ -126,7 +137,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ScheduledExecutorService exec,
ServerTypeConfig serverTypeConfig
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter
)
{
this.config = config;
@ -135,6 +148,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
this.segmentManager = segmentManager;
this.exec = exec;
this.serverTypeConfig = serverTypeConfig;
this.coordinatorClient = coordinatorClient;
this.emitter = emitter;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
@ -151,7 +166,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
log.info("Starting...");
try {
if (segmentManager.canHandleSegments()) {
bootstrapCachedSegments();
loadSegmentsOnStartup();
}
if (shouldAnnounce()) {
@ -207,12 +222,17 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
/**
* Bulk loading of cached segments into page cache during bootstrap.
* Bulk loading of the following segments into the page cache at startup:
* <li> Previously cached segments </li>
* <li> Bootstrap segments from the coordinator </li>
*/
private void bootstrapCachedSegments() throws IOException
private void loadSegmentsOnStartup() throws IOException
{
final List<DataSegment> segmentsOnStartup = new ArrayList<>();
segmentsOnStartup.addAll(segmentManager.getCachedSegments());
segmentsOnStartup.addAll(getBootstrapSegments());
final Stopwatch stopwatch = Stopwatch.createStarted();
final List<DataSegment> segments = segmentManager.getCachedSegments();
// Start a temporary thread pool to load segments into page cache during bootstrap
final ExecutorService loadingExecutor = Execs.multiThreaded(
@ -224,11 +244,11 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
backgroundSegmentAnnouncer.startAnnouncing();
final int numSegments = segments.size();
final int numSegments = segmentsOnStartup.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
for (final DataSegment segment : segmentsOnStartup) {
loadingExecutor.submit(
() -> {
try {
@ -269,7 +289,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
latch.await();
if (failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
log.makeAlert("[%,d] errors seen while loading segments on startup", failedSegments.size())
.addData("failedSegments", failedSegments)
.emit();
}
@ -282,8 +302,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size())
log.makeAlert(e, "Failed to load segments on startup -- likely problem with announcing.")
.addData("numSegments", segmentsOnStartup.size())
.emit();
}
finally {
@ -292,10 +312,41 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
// At this stage, all tasks have been submitted, send a shutdown command to cleanup any resources alloted
// for the bootstrapping function.
segmentManager.shutdownBootstrap();
log.info("Cache load of [%d] bootstrap segments took [%,d]ms.", segments.size(), stopwatch.millisElapsed());
log.info("Loaded [%d] segments on startup in [%,d]ms.", segmentsOnStartup.size(), stopwatch.millisElapsed());
}
}
/**
* @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
*/
private List<DataSegment> getBootstrapSegments()
{
log.info("Fetching bootstrap segments from the coordinator.");
final Stopwatch stopwatch = Stopwatch.createStarted();
List<DataSegment> bootstrapSegments = new ArrayList<>();
try {
final BootstrapSegmentsResponse response =
FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true);
bootstrapSegments = ImmutableList.copyOf(response.getIterator());
}
catch (Exception e) {
// By default, we "fail open" when there is any error -- finding the coordinator, or if the API endpoint cannot
// be found during rolling upgrades, or even if it's irrecoverable.
log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage());
}
finally {
stopwatch.stop();
final long fetchRunMillis = stopwatch.millisElapsed();
emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time", fetchRunMillis));
emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size()));
log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis);
}
return bootstrapSegments;
}
@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{

View File

@ -174,6 +174,12 @@ public class DruidCoordinator
*/
private volatile SegmentReplicationStatus segmentReplicationStatus = null;
/**
* Set of broadcast segments determined in the latest coordinator run of the {@link RunRules} duty.
* This might contain stale information if the Coordinator duties haven't run or are delayed.
*/
private volatile Set<DataSegment> broadcastSegments = null;
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
@ -315,6 +321,16 @@ public class DruidCoordinator
return loadStatus;
}
/**
* @return Set of broadcast segments determined by the latest run of the {@link RunRules} duty.
* If the coordinator runs haven't triggered or are delayed, this information may be stale.
*/
@Nullable
public Set<DataSegment> getBroadcastSegments()
{
return broadcastSegments;
}
@Nullable
public Integer getReplicationFactor(SegmentId segmentId)
{
@ -798,6 +814,7 @@ public class DruidCoordinator
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
broadcastSegments = params.getBroadcastSegments();
segmentReplicationStatus = params.getSegmentReplicationStatus();
// Collect stats for unavailable and under-replicated segments

View File

@ -123,6 +123,12 @@ public class DruidCoordinatorRuntimeParams
return segmentAssigner == null ? null : segmentAssigner.getReplicationStatus();
}
@Nullable
public Set<DataSegment> getBroadcastSegments()
{
return segmentAssigner == null ? null : segmentAssigner.getBroadcastSegments();
}
public StrategicSegmentAssigner getSegmentAssigner()
{
return segmentAssigner;

View File

@ -69,6 +69,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
private final Map<String, Integer> tierToHistoricalCount = new HashMap<>();
private final Map<String, Set<SegmentId>> segmentsToDelete = new HashMap<>();
private final Map<String, Set<DataSegment>> segmentsWithZeroRequiredReplicas = new HashMap<>();
private final Set<DataSegment> broadcastSegments = new HashSet<>();
public StrategicSegmentAssigner(
SegmentLoadQueueManager loadQueueManager,
@ -361,6 +362,8 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
entry -> replicaCountMap.computeIfAbsent(segment.getId(), entry.getKey())
.setRequired(entry.getIntValue(), entry.getIntValue())
);
broadcastSegments.add(segment);
}
@Override
@ -398,6 +401,11 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
return false;
}
public Set<DataSegment> getBroadcastSegments()
{
return broadcastSegments;
}
/**
* Drops the broadcast segment if it is loaded on the given server.
* Returns true only if the segment was successfully queued for drop on the server.

View File

@ -471,4 +471,23 @@ public class MetadataResource
);
return Response.status(Response.Status.OK).entity(authorizedDataSourceInformation).build();
}
/**
* @return all bootstrap segments determined by the coordinator.
*/
@POST
@Path("/bootstrapSegments")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response getBootstrapSegments()
{
final Set<DataSegment> broadcastSegments = coordinator.getBroadcastSegments();
if (broadcastSegments == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("Bootstrap segments are not initialized yet."
+ " Please ensure that the Coordinator duties are running and try again.")
.build();
}
return Response.status(Response.Status.OK).entity(broadcastSegments).build();
}
}

View File

@ -24,7 +24,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Injector;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -37,6 +42,7 @@ import org.apache.druid.segment.metadata.DataSourceInformation;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.PruneLoadSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@ -58,6 +64,24 @@ public class CoordinatorClientImplTest
private MockServiceClient serviceClient;
private CoordinatorClient coordinatorClient;
private static final DataSegment SEGMENT1 = DataSegment.builder()
.dataSource("xyz")
.interval(Intervals.of("1000/2000"))
.version("1")
.loadSpec(ImmutableMap.of("type", "local", "loc", "foo"))
.shardSpec(new NumberedShardSpec(0, 1))
.size(1)
.build();
private static final DataSegment SEGMENT2 = DataSegment.builder()
.dataSource("xyz")
.interval(Intervals.of("2000/3000"))
.version("1")
.loadSpec(ImmutableMap.of("type", "local", "loc", "bar"))
.shardSpec(new NumberedShardSpec(0, 1))
.size(1)
.build();
@Before
public void setup()
{
@ -181,6 +205,82 @@ public class CoordinatorClientImplTest
);
}
@Test
public void test_fetchBootstrapSegments() throws Exception
{
final List<DataSegment> expectedSegments = ImmutableList.of(SEGMENT1, SEGMENT2);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/bootstrapSegments"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(expectedSegments)
);
final ListenableFuture<BootstrapSegmentsResponse> response = coordinatorClient.fetchBootstrapSegments();
Assert.assertNotNull(response);
final ImmutableList<DataSegment> observedDataSegments = ImmutableList.copyOf(response.get().getIterator());
for (int idx = 0; idx < expectedSegments.size(); idx++) {
Assert.assertEquals(expectedSegments.get(idx).getLoadSpec(), observedDataSegments.get(idx).getLoadSpec());
}
}
/**
* Set up a Guice injector with PruneLoadSpec set to true. This test verifies that the bootstrap segments API
* always return segments with load specs present, ensuring they can be loaded anywhere.
*/
@Test
public void test_fetchBootstrapSegmentsAreLoadableWhenPruneLoadSpecIsEnabled() throws Exception
{
final List<DataSegment> expectedSegments = ImmutableList.of(SEGMENT1, SEGMENT2);
// Set up a coordinator client with PruneLoadSpec set to true in the injector
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(binder -> binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true))
.build();
final ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
final CoordinatorClient coordinatorClient = new CoordinatorClientImpl(serviceClient, objectMapper);
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/bootstrapSegments"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
objectMapper.writeValueAsBytes(expectedSegments)
);
final ListenableFuture<BootstrapSegmentsResponse> response = coordinatorClient.fetchBootstrapSegments();
Assert.assertNotNull(response);
final ImmutableList<DataSegment> observedDataSegments = ImmutableList.copyOf(response.get().getIterator());
Assert.assertEquals(expectedSegments, observedDataSegments);
for (int idx = 0; idx < expectedSegments.size(); idx++) {
Assert.assertEquals(expectedSegments.get(idx).getLoadSpec(), observedDataSegments.get(idx).getLoadSpec());
}
}
@Test
public void test_fetchEmptyBootstrapSegments() throws Exception
{
final List<DataSegment> segments = ImmutableList.of();
serviceClient.expectAndRespond(
new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/bootstrapSegments"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(segments)
);
final ListenableFuture<BootstrapSegmentsResponse> response = coordinatorClient.fetchBootstrapSegments();
Assert.assertNotNull(response);
Assert.assertEquals(
segments,
ImmutableList.copyOf(response.get().getIterator())
);
}
@Test
public void test_fetchDataSourceInformation() throws Exception
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.client.coordinator;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
@ -62,6 +63,12 @@ public class NoopCoordinatorClient implements CoordinatorClient
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments()
{
throw new UnsupportedOperationException();
}
@Override
public CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.loading.DataSegmentPusher;
@ -34,7 +36,6 @@ import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@ -63,6 +64,8 @@ public class SegmentLoadDropHandlerCacheTest
private SegmentManager segmentManager;
private SegmentLoaderConfig loaderConfig;
private SegmentLocalCacheManager cacheManager;
private TestCoordinatorClient coordinatorClient;
private ServiceEmitter emitter;
private ObjectMapper objectMapper;
@Before
@ -100,7 +103,9 @@ public class SegmentLoadDropHandlerCacheTest
segmentManager = new SegmentManager(cacheManager);
segmentAnnouncer = new TestDataSegmentAnnouncer();
serverAnnouncer = new TestDataServerAnnouncer();
EmittingLogger.registerEmitter(new NoopServiceEmitter());
coordinatorClient = new TestCoordinatorClient();
emitter = new StubServiceEmitter();
EmittingLogger.registerEmitter(emitter);
}
@Test
@ -122,7 +127,9 @@ public class SegmentLoadDropHandlerCacheTest
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.BROKER)
new ServerTypeConfig(ServerType.BROKER),
coordinatorClient,
emitter
);
loadDropHandler.start();
@ -140,7 +147,9 @@ public class SegmentLoadDropHandlerCacheTest
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.BROKER)
new ServerTypeConfig(ServerType.BROKER),
coordinatorClient,
emitter
);
loadDropHandler.start();
@ -171,7 +180,9 @@ public class SegmentLoadDropHandlerCacheTest
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL)
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
emitter
);
// Start the load drop handler

View File

@ -22,12 +22,15 @@ package org.apache.druid.server.coordination;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
@ -37,7 +40,6 @@ import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.server.coordination.SegmentChangeStatus.State;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
@ -72,6 +74,8 @@ public class SegmentLoadDropHandlerTest
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private ScheduledExecutorFactory scheduledExecutorFactory;
private TestCoordinatorClient coordinatorClient;
private StubServiceEmitter serviceEmitter;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -136,7 +140,9 @@ public class SegmentLoadDropHandlerTest
};
};
EmittingLogger.registerEmitter(new NoopServiceEmitter());
coordinatorClient = new TestCoordinatorClient();
serviceEmitter = new StubServiceEmitter();
EmittingLogger.registerEmitter(serviceEmitter);
}
/**
@ -293,6 +299,71 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
}
@Test
public void testLoadBootstrapSegments() throws Exception
{
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments);
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, coordinatorClient);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments());
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size());
serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
handler.stop();
}
@Test
public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, new NoopCoordinatorClient());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
serviceEmitter.verifyValue("segment/bootstrap/count", 0);
serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
handler.stop();
}
@Test
public void testStartStop() throws Exception
{
@ -467,7 +538,8 @@ public class SegmentLoadDropHandlerTest
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(
noAnnouncerSegmentLoaderConfig,
segmentManager
segmentManager,
coordinatorClient
);
handler.start();
@ -543,12 +615,21 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager)
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager, CoordinatorClient coordinatorClient)
{
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager);
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentLoaderConfig config, SegmentManager segmentManager)
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager)
{
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(
SegmentLoaderConfig config,
SegmentManager segmentManager,
CoordinatorClient coordinatorClient
)
{
return new SegmentLoadDropHandler(
config,
@ -556,7 +637,9 @@ public class SegmentLoadDropHandlerTest
serverAnnouncer,
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter
);
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.timeline.DataSegment;
import java.util.HashSet;
import java.util.Set;
class TestCoordinatorClient extends NoopCoordinatorClient
{
private final Set<DataSegment> bootstrapSegments;
TestCoordinatorClient()
{
this(new HashSet<>());
}
TestCoordinatorClient(final Set<DataSegment> bootstrapSegments)
{
this.bootstrapSegments = bootstrapSegments;
}
@Override
public ListenableFuture<BootstrapSegmentsResponse> fetchBootstrapSegments()
{
return Futures.immediateFuture(
new BootstrapSegmentsResponse(CloseableIterators.withEmptyBaggage(bootstrapSegments.iterator()))
);
}
}

View File

@ -106,7 +106,9 @@ public class ZkCoordinatorTest extends CuratorTestBase
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class),
EasyMock.createNiceMock(ScheduledExecutorService.class),
new ServerTypeConfig(ServerType.HISTORICAL)
new ServerTypeConfig(ServerType.HISTORICAL),
new TestCoordinatorClient(),
new NoopServiceEmitter()
)
{
@Override

View File

@ -82,6 +82,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -266,6 +267,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
coordinator.start();
Assert.assertNull(coordinator.getReplicationFactor(dataSegment.getId()));
Assert.assertNull(coordinator.getBroadcastSegments());
// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();
@ -293,6 +295,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
coordinator.getDatasourceToUnavailableSegmentCount();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
Assert.assertEquals(0, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
Assert.assertEquals(0, coordinator.getBroadcastSegments().size());
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
@ -571,6 +574,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
Assert.assertEquals(new HashSet<>(dataSegments.values()), coordinator.getBroadcastSegments());
// Under-replicated counts are updated only after the next coordinator run
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =

View File

@ -486,6 +486,35 @@ public class MetadataResourceTest
);
}
@Test
public void testGetBootstrapSegments()
{
Mockito.doReturn(ImmutableSet.of(segments[0], segments[1])).when(coordinator).getBroadcastSegments();
Response response = metadataResource.getBootstrapSegments();
final List<DataSegment> observedSegments = extractResponseList(response);
Assert.assertEquals(2, observedSegments.size());
}
@Test
public void testEmptyGetBootstrapSegments()
{
Mockito.doReturn(ImmutableSet.of()).when(coordinator).getBroadcastSegments();
Response response = metadataResource.getBootstrapSegments();
final List<DataSegment> observedSegments = extractResponseList(response);
Assert.assertEquals(0, observedSegments.size());
}
@Test
public void testNullGetBootstrapSegments()
{
Mockito.doReturn(null).when(coordinator).getBroadcastSegments();
Response response = metadataResource.getBootstrapSegments();
Assert.assertEquals(503, response.getStatus());
}
private <T> List<T> extractResponseList(Response response)
{
return Lists.newArrayList(