remove duplicate Broker ServerInventoryView, improve HttpServerInventoryView logging (#12209)

* changes:
* remove SystemSchema duplicate ServerInventoryView in broker
* suppress duplicate segment added/removed warnings in HttpServerInventoryView when doing a full sync

* fixes
This commit is contained in:
Clint Wylie 2022-02-03 12:57:34 -08:00 committed by GitHub
parent 3717693633
commit 8fd587b28c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 25 deletions

View File

@ -550,7 +550,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
if (request instanceof SegmentChangeRequestLoad) {
DataSegment segment = ((SegmentChangeRequestLoad) request).getSegment();
toRemove.remove(segment.getId());
addSegment(segment);
addSegment(segment, true);
} else {
log.error(
"Server[%s] gave a non-load dataSegmentChangeRequest[%s]., Ignored.",
@ -561,7 +561,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
for (DataSegment segmentToRemove : toRemove.values()) {
removeSegment(segmentToRemove);
removeSegment(segmentToRemove, true);
}
}
@ -570,9 +570,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
{
for (DataSegmentChangeRequest request : changes) {
if (request instanceof SegmentChangeRequestLoad) {
addSegment(((SegmentChangeRequestLoad) request).getSegment());
addSegment(((SegmentChangeRequestLoad) request).getSegment(), false);
} else if (request instanceof SegmentChangeRequestDrop) {
removeSegment(((SegmentChangeRequestDrop) request).getSegment());
removeSegment(((SegmentChangeRequestDrop) request).getSegment(), false);
} else {
log.error(
"Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.",
@ -585,7 +585,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
};
}
private void addSegment(DataSegment segment)
private void addSegment(DataSegment segment, boolean fullSync)
{
if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) {
if (druidServer.getSegment(segment.getId()) == null) {
@ -601,7 +601,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
);
} else {
} else if (!fullSync) {
// duplicates can happen when doing a full sync from a 'reset', so only warn for dupes on delta changes
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
segment.getId(),
@ -611,7 +612,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
private void removeSegment(final DataSegment segment)
private void removeSegment(final DataSegment segment, boolean fullSync)
{
if (druidServer.removeDataSegment(segment.getId()) != null) {
runSegmentCallbacks(
@ -624,7 +625,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
);
} else {
} else if (!fullSync) {
// duplicates can happen when doing a full sync from a 'reset', so only warn for dupes on delta changes
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
segment.getId(),

View File

@ -45,8 +45,8 @@ import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.InventoryView;
import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
@ -206,7 +206,7 @@ public class SystemSchema extends AbstractSchema
final DruidSchema druidSchema,
final MetadataSegmentView metadataView,
final TimelineServerView serverView,
final InventoryView serverInventoryView,
final FilteredServerInventoryView serverInventoryView,
final AuthorizerMapper authorizerMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
final @IndexingService DruidLeaderClient overlordDruidLeaderClient,
@ -480,13 +480,13 @@ public class SystemSchema extends AbstractSchema
private final AuthorizerMapper authorizerMapper;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final InventoryView serverInventoryView;
private final FilteredServerInventoryView serverInventoryView;
private final DruidLeaderClient overlordLeaderClient;
private final DruidLeaderClient coordinatorLeaderClient;
public ServersTable(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
InventoryView serverInventoryView,
FilteredServerInventoryView serverInventoryView,
AuthorizerMapper authorizerMapper,
DruidLeaderClient overlordLeaderClient,
DruidLeaderClient coordinatorLeaderClient

View File

@ -28,7 +28,7 @@ import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import org.apache.druid.client.InventoryView;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
@ -77,7 +77,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
@Mock
AuthorizerMapper authorizerMapper;
@Mock
private InventoryView serverInventoryView;
private FilteredServerInventoryView serverInventoryView;
@Mock
private DruidLeaderClient coordinatorDruidLeaderClient;
@Mock
@ -110,7 +110,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase
binder.bind(ViewManager.class).toInstance(viewManager);
binder.bind(Escalator.class).toInstance(escalator);
binder.bind(AuthorizerMapper.class).toInstance(authorizerMapper);
binder.bind(InventoryView.class).toInstance(serverInventoryView);
binder.bind(FilteredServerInventoryView.class).toInstance(serverInventoryView);
binder.bind(SegmentManager.class).toInstance(segmentManager);
binder.bind(DruidLeaderClient.class)
.annotatedWith(Coordinator.class)

View File

@ -36,10 +36,9 @@ import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.InventoryView;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
@ -162,7 +161,7 @@ public class SystemSchemaTest extends CalciteTestBase
private static Closer resourceCloser;
private MetadataSegmentView metadataView;
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private InventoryView serverInventoryView;
private FilteredServerInventoryView serverInventoryView;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -260,7 +259,7 @@ public class SystemSchemaTest extends CalciteTestBase
druidSchema.awaitInitialization();
metadataView = EasyMock.createMock(MetadataSegmentView.class);
druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
serverInventoryView = EasyMock.createMock(ServerInventoryView.class);
serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class);
schema = new SystemSchema(
druidSchema,
metadataView,

View File

@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.util;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -35,7 +36,9 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
@ -56,6 +59,7 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
@ -100,6 +104,7 @@ import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AllowAllAuthenticator;
@ -1347,7 +1352,7 @@ public class CalciteTests
/**
* A fake {@link ServerInventoryView} for {@link #createMockSystemSchema}.
*/
private static class FakeServerInventoryView implements ServerInventoryView
private static class FakeServerInventoryView implements FilteredServerInventoryView
{
@Nullable
@Override
@ -1375,13 +1380,20 @@ public class CalciteTests
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
public void registerSegmentCallback(
Executor exec,
ServerView.SegmentCallback callback,
Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
throw new UnsupportedOperationException();
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
public void registerServerRemovedCallback(
Executor exec,
ServerView.ServerRemovedCallback callback
)
{
throw new UnsupportedOperationException();
}

View File

@ -28,7 +28,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import org.apache.druid.client.InventoryView;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
@ -85,7 +85,7 @@ public class SqlModuleTest
private ServiceEmitter serviceEmitter;
@Mock
private InventoryView inventoryView;
private FilteredServerInventoryView inventoryView;
@Mock
private TimelineServerView timelineServerView;
@ -183,7 +183,7 @@ public class SqlModuleTest
binder.bind(ServiceEmitter.class).toInstance(serviceEmitter);
binder.bind(RequestLogger.class).toInstance(new NoopRequestLogger());
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(null)));
binder.bind(InventoryView.class).toInstance(inventoryView);
binder.bind(FilteredServerInventoryView.class).toInstance(inventoryView);
binder.bind(TimelineServerView.class).toInstance(timelineServerView);
binder.bind(DruidLeaderClient.class).annotatedWith(Coordinator.class).toInstance(druidLeaderClient);
binder.bind(DruidLeaderClient.class).annotatedWith(IndexingService.class).toInstance(druidLeaderClient);