Using ImmutableDruidDataSource as a key for map and set instead of DruidDataSource (#5054)

* use ImmutableDruidDataSource for map and set

* address comments

* unused import

* allow returning only ImmutableDruidDataSource in MetadataSegmentManager

* address comments

* remove TreeSet

* revert to use TreeSet
This commit is contained in:
Jihoon Son 2017-11-10 04:07:58 +09:00 committed by Roman Leventov
parent 3541b7544b
commit c11c71ab3e
34 changed files with 358 additions and 430 deletions

View File

@ -27,7 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.data.input.impl.DelimitedParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
@ -263,7 +263,7 @@ public class HadoopConverterJobTest
Thread.sleep(10);
}
manager.poll();
final DruidDataSource druidDataSource = manager.getInventoryValue(DATASOURCE);
final ImmutableDruidDataSource druidDataSource = manager.getInventoryValue(DATASOURCE);
manager.stop();
return Lists.newArrayList(druidDataSource.getSegments());
}

View File

@ -27,6 +27,7 @@ import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -126,7 +127,7 @@ public class TestServerView implements FilteredServerInventoryView, ServerView.S
}
@Override
public Iterable<DruidServer> getInventory()
public Collection<DruidServer> getInventory()
{
return null;
}

View File

@ -36,6 +36,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -204,7 +205,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
}
@Override
public Iterable<DruidServer> getInventory()
public Collection<DruidServer> getInventory()
{
return inventoryManager.getInventory();
}
@ -282,7 +283,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
return;
}
container.addDataSegment(inventory.getIdentifier(), inventory);
container.addDataSegment(inventory);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()

View File

@ -23,7 +23,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DataSource;
@ -32,6 +31,7 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -203,7 +203,7 @@ public class CoordinatorServerView implements InventoryView
}
@Override
public Iterable<DruidServer> getInventory()
public Collection<DruidServer> getInventory()
{
return baseView.getInventory();
}

View File

@ -20,37 +20,31 @@
package io.druid.client;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.druid.timeline.DataSegment;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class DruidDataSource
{
private final Object lock = new Object();
private final String name;
private final Map<String, String> properties;
private final Map<String, DataSegment> partitionNames;
private final ConcurrentSkipListSet<DataSegment> segmentsHolder;
private final ConcurrentHashMap<String, DataSegment> idToSegmentMap;
public DruidDataSource(
String name,
Map<String, String> properties
)
{
this.name = name;
this.name = Preconditions.checkNotNull(name);
this.properties = properties;
this.partitionNames = Maps.newHashMap();
this.segmentsHolder = new ConcurrentSkipListSet<DataSegment>();
this.idToSegmentMap = new ConcurrentHashMap<>();
}
@JsonProperty
@ -65,115 +59,60 @@ public class DruidDataSource
return properties;
}
@JsonProperty
public Set<DataSegment> getSegments()
public Collection<DataSegment> getSegments()
{
return Collections.unmodifiableSet(segmentsHolder);
return Collections.unmodifiableCollection(idToSegmentMap.values());
}
public DruidDataSource addSegment(String partitionName, DataSegment dataSegment)
public DruidDataSource addSegment(DataSegment dataSegment)
{
synchronized (lock) {
partitionNames.put(partitionName, dataSegment);
segmentsHolder.add(dataSegment);
}
idToSegmentMap.put(dataSegment.getIdentifier(), dataSegment);
return this;
}
public DruidDataSource addSegments(Map<String, DataSegment> partitionMap)
public DruidDataSource removePartition(String segmentId)
{
synchronized (lock) {
partitionNames.putAll(partitionMap);
segmentsHolder.addAll(partitionMap.values());
}
idToSegmentMap.remove(segmentId);
return this;
}
public DruidDataSource removePartition(String partitionName)
public DataSegment getSegment(String identifier)
{
synchronized (lock) {
DataSegment dataPart = partitionNames.remove(partitionName);
if (dataPart == null) {
return this;
}
segmentsHolder.remove(dataPart);
}
return this;
return idToSegmentMap.get(identifier);
}
public boolean isEmpty()
{
return segmentsHolder.isEmpty();
return idToSegmentMap.isEmpty();
}
public ImmutableDruidDataSource toImmutableDruidDataSource()
{
return new ImmutableDruidDataSource(
name,
ImmutableMap.copyOf(properties),
ImmutableMap.copyOf(idToSegmentMap)
);
}
@Override
public String toString()
{
synchronized (lock) {
return "DruidDataSource{" +
"properties=" + properties +
", partitions=" + segmentsHolder.toString() +
'}';
}
}
public ImmutableDruidDataSource toImmutableDruidDataSource()
{
synchronized (lock) {
return new ImmutableDruidDataSource(
name,
ImmutableMap.copyOf(properties),
ImmutableMap.copyOf(partitionNames),
ImmutableSet.copyOf(segmentsHolder)
);
}
return "DruidDataSource{" +
"properties=" + properties +
", partitions=" + idToSegmentMap.values() +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DruidDataSource that = (DruidDataSource) o;
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
if (partitionNames != null ? !partitionNames.equals(that.partitionNames) : that.partitionNames != null) {
return false;
}
if (properties != null ? !properties.equals(that.properties) : that.properties != null) {
return false;
}
if (segmentsHolder != null ? !segmentsHolder.equals(that.segmentsHolder) : that.segmentsHolder != null) {
return false;
}
return true;
throw new UnsupportedOperationException("Use ImmutableDruidDataSource instead");
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (properties != null ? properties.hashCode() : 0);
result = 31 * result + (partitionNames != null ? partitionNames.hashCode() : 0);
result = 31 * result + (segmentsHolder != null ? segmentsHolder.hashCode() : 0);
return result;
}
public DataSegment getSegment(String identifier)
{
return partitionNames.get(identifier);
throw new UnsupportedOperationException("Use ImmutableDruidDataSource instead");
}
}

View File

@ -24,13 +24,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -167,9 +167,10 @@ public class DruidServer implements Comparable
return segments.get(segmentName);
}
public DruidServer addDataSegment(String segmentId, DataSegment segment)
public DruidServer addDataSegment(DataSegment segment)
{
synchronized (lock) {
final String segmentId = segment.getIdentifier();
DataSegment shouldNotExist = segments.get(segmentId);
if (shouldNotExist != null) {
@ -188,7 +189,7 @@ public class DruidServer implements Comparable
dataSources.put(dataSourceName, dataSource);
}
dataSource.addSegment(segmentId, segment);
dataSource.addSegment(segment);
segments.put(segmentId, segment);
currSize += segment.getSize();
@ -199,9 +200,7 @@ public class DruidServer implements Comparable
public DruidServer addDataSegments(DruidServer server)
{
synchronized (lock) {
for (Map.Entry<String, DataSegment> entry : server.segments.entrySet()) {
addDataSegment(entry.getKey(), entry.getValue());
}
server.segments.values().forEach(this::addDataSegment);
}
return this;
}
@ -246,7 +245,7 @@ public class DruidServer implements Comparable
return dataSources.get(dataSource);
}
public Iterable<DruidDataSource> getDataSources()
public Collection<DruidDataSource> getDataSources()
{
return dataSources.values();
}
@ -272,7 +271,7 @@ public class DruidServer implements Comparable
DruidServer that = (DruidServer) o;
if (getName() != null ? !getName().equals(that.getName()) : that.getName() != null) {
if (!metadata.equals(that.metadata)) {
return false;
}
@ -282,7 +281,7 @@ public class DruidServer implements Comparable
@Override
public int hashCode()
{
return getName() != null ? getName().hashCode() : 0;
return metadata.hashCode();
}
@Override

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.FutureCallback;
@ -67,6 +66,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -78,6 +78,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* This class uses CuratorInventoryManager to listen for queryable server membership which serve segments(e.g. Historicals).
@ -294,18 +295,12 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
@Override
public Iterable<DruidServer> getInventory()
public Collection<DruidServer> getInventory()
{
return Iterables.transform(
servers.values(), new Function<DruidServerHolder, DruidServer>()
{
@Override
public DruidServer apply(DruidServerHolder input)
{
return input.druidServer;
}
}
);
return servers.values()
.stream()
.map(serverHolder -> serverHolder.druidServer)
.collect(Collectors.toList());
}
private void runSegmentCallbacks(
@ -657,7 +652,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
{
if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) {
if (druidServer.getSegment(segment.getIdentifier()) == null) {
druidServer.addDataSegment(segment.getIdentifier(), segment);
druidServer.addDataSegment(segment);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{

View File

@ -19,12 +19,13 @@
package io.druid.client;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.timeline.DataSegment;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.Objects;
/**
*/
@ -32,20 +33,17 @@ public class ImmutableDruidDataSource
{
private final String name;
private final ImmutableMap<String, String> properties;
private final ImmutableMap<String, DataSegment> partitionNames;
private final ImmutableSet<DataSegment> segmentsHolder;
private final ImmutableMap<String, DataSegment> idToSegments;
public ImmutableDruidDataSource(
String name,
ImmutableMap<String, String> properties,
ImmutableMap<String, DataSegment> partitionNames,
ImmutableSet<DataSegment> segmentsHolder
ImmutableMap<String, DataSegment> idToSegments
)
{
this.name = name;
this.name = Preconditions.checkNotNull(name);
this.properties = properties;
this.partitionNames = partitionNames;
this.segmentsHolder = segmentsHolder;
this.idToSegments = idToSegments;
}
public String getName()
@ -58,29 +56,58 @@ public class ImmutableDruidDataSource
return properties;
}
public Map<String, DataSegment> getPartitionNames()
{
return partitionNames;
}
public boolean isEmpty()
{
return segmentsHolder.isEmpty();
return idToSegments.isEmpty();
}
public Set<DataSegment> getSegments()
public Collection<DataSegment> getSegments()
{
return segmentsHolder;
return idToSegments.values();
}
public DataSegment getSegment(String segmentIdentifier)
{
return idToSegments.get(segmentIdentifier);
}
@Override
public String toString()
{
// partitionNames is intentionally ignored because it is usually large
// The detail of idToSegments is intentionally ignored because it is usually large
return "ImmutableDruidDataSource{"
+ "name='" + name
+ "', segments='" + segmentsHolder
+ "', # of segments='" + idToSegments.size()
+ "', properties='" + properties
+ "'}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || !getClass().equals(o.getClass())) {
return false;
}
final ImmutableDruidDataSource that = (ImmutableDruidDataSource) o;
if (!this.name.equals(that.name)) {
return false;
}
if (!this.properties.equals(that.properties)) {
return false;
}
return this.idToSegments.equals(that.idToSegments);
}
@Override
public int hashCode()
{
return Objects.hash(name, properties, idToSegments);
}
}

View File

@ -19,6 +19,7 @@
package io.druid.client;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import io.druid.server.coordination.DruidServerMetadata;
@ -43,7 +44,7 @@ public class ImmutableDruidServer
ImmutableMap<String, DataSegment> segments
)
{
this.metadata = metadata;
this.metadata = Preconditions.checkNotNull(metadata);
this.currSize = currSize;
this.segments = segments;
this.dataSources = dataSources;
@ -128,4 +129,29 @@ public class ImmutableDruidServer
+ "', sources='" + dataSources
+ "'}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ImmutableDruidServer that = (ImmutableDruidServer) o;
if (metadata.equals(that.metadata)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return metadata.hashCode();
}
}

View File

@ -21,12 +21,14 @@ package io.druid.client;
import io.druid.timeline.DataSegment;
import java.util.Collection;
/**
*/
public interface InventoryView
{
DruidServer getInventoryValue(String string);
Iterable<DruidServer> getInventory();
Collection<DruidServer> getInventory();
boolean isStarted();
boolean isSegmentLoadedByServer(String serverKey, DataSegment segment);
}

View File

@ -19,8 +19,6 @@
package io.druid.curator.inventory;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.java.util.common.StringUtils;
@ -36,11 +34,13 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* This class is deprecated. Use {@link io.druid.client.HttpServerInventoryView} for segment discovery.
@ -163,19 +163,12 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
return containerHolder == null ? null : containerHolder.getContainer();
}
public Iterable<ContainerClass> getInventory()
public Collection<ContainerClass> getInventory()
{
return Iterables.transform(
containers.values(),
new Function<ContainerHolder, ContainerClass>()
{
@Override
public ContainerClass apply(ContainerHolder input)
{
return input.getContainer();
}
}
);
return containers.values()
.stream()
.map(ContainerHolder::getContainer)
.collect(Collectors.toList());
}
private byte[] getZkDataForNode(String path)

View File

@ -19,9 +19,10 @@
package io.druid.metadata;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
@ -44,9 +45,10 @@ public interface MetadataSegmentManager
boolean isStarted();
DruidDataSource getInventoryValue(String key);
@Nullable
ImmutableDruidDataSource getInventoryValue(String key);
Collection<DruidDataSource> getInventory();
Collection<ImmutableDruidDataSource> getInventory();
Collection<String> getAllDatasourceNames();

View File

@ -35,6 +35,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.concurrent.LifecycleLock;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.DateTimes;
@ -63,6 +64,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -74,6 +76,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
*/
@ -369,15 +372,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
}
@Override
public DruidDataSource getInventoryValue(String key)
@Nullable
public ImmutableDruidDataSource getInventoryValue(String key)
{
return dataSourcesRef.get().get(key);
final DruidDataSource dataSource = dataSourcesRef.get().get(key);
return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
}
@Override
public Collection<DruidDataSource> getInventory()
public Collection<ImmutableDruidDataSource> getInventory()
{
return dataSourcesRef.get().values();
return dataSourcesRef.get()
.values()
.stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList());
}
@Override
@ -495,7 +504,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
}
if (!dataSource.getSegments().contains(segment)) {
dataSource.addSegment(segment.getIdentifier(), segment);
dataSource.addSegment(segment);
}
}

View File

@ -21,6 +21,9 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Objects;
/**
*/
@ -45,7 +48,7 @@ public class DruidServerMetadata
@JsonProperty("priority") int priority
)
{
this.name = name;
this.name = Preconditions.checkNotNull(name);
this.hostAndPort = hostAndPort;
this.hostAndTlsPort = hostAndTlsPort;
this.maxSize = maxSize;
@ -118,38 +121,31 @@ public class DruidServerMetadata
DruidServerMetadata that = (DruidServerMetadata) o;
if (!name.equals(that.name)) {
return false;
}
if (!Objects.equals(hostAndPort, that.hostAndPort)) {
return false;
}
if (!Objects.equals(hostAndTlsPort, that.hostAndTlsPort)) {
return false;
}
if (maxSize != that.maxSize) {
return false;
}
if (priority != that.priority) {
if (!Objects.equals(tier, that.tier)) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
if (type != that.type) {
return false;
}
if (hostAndPort != null ? !hostAndPort.equals(that.hostAndPort) : that.hostAndPort != null) {
return false;
}
if (hostAndTlsPort != null ? !hostAndTlsPort.equals(that.hostAndTlsPort) : that.hostAndTlsPort != null) {
return false;
}
if (tier != null ? !tier.equals(that.tier) : that.tier != null) {
return false;
}
return type == that.type;
return priority == that.priority;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (hostAndPort != null ? hostAndPort.hashCode() : 0);
result = 31 * result + (hostAndTlsPort != null ? hostAndTlsPort.hashCode() : 0);
result = 31 * result + (int) (maxSize ^ (maxSize >>> 32));
result = 31 * result + (tier != null ? tier.hashCode() : 0);
result = 31 * result + (type != null ? type.hashCode() : 0);
result = 31 * result + priority;
return result;
return Objects.hash(name, hostAndPort, hostAndTlsPort, maxSize, tier, type, priority);
}
@Override

View File

@ -23,7 +23,6 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -41,7 +40,6 @@ import io.druid.client.ServerInventoryView;
import io.druid.client.coordinator.Coordinator;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.guice.ManageLifecycle;
@ -49,7 +47,9 @@ import io.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import io.druid.guice.annotations.Self;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.guava.Comparators;
@ -88,6 +88,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
*/
@ -290,7 +291,7 @@ public class DruidCoordinator
public Map<String, Double> getLoadStatus()
{
Map<String, Double> loadStatus = Maps.newHashMap();
for (DruidDataSource dataSource : metadataSegmentManager.getInventory()) {
for (ImmutableDruidDataSource dataSource : metadataSegmentManager.getInventory()) {
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
final int availableSegmentSize = segments.size();
@ -326,16 +327,6 @@ public class DruidCoordinator
metadataSegmentManager.removeSegment(segment.getDataSource(), segment.getIdentifier());
}
public void removeDatasource(String ds)
{
metadataSegmentManager.removeDatasource(ds);
}
public void enableDatasource(String ds)
{
metadataSegmentManager.enableDatasource(ds);
}
public String getCurrentLeader()
{
return coordLeaderSelector.getCurrentLeader();
@ -353,6 +344,7 @@ public class DruidCoordinator
if (callback != null) {
callback.execute();
}
throw new ISE("Cannot move null DataSegment");
}
String segmentName = segment.getIdentifier();
try {
@ -360,7 +352,7 @@ public class DruidCoordinator
throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentName, fromServer.getName());
}
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(segment.getDataSource());
ImmutableDruidDataSource dataSource = metadataSegmentManager.getInventoryValue(segment.getDataSource());
if (dataSource == null) {
throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentName);
}
@ -460,21 +452,12 @@ public class DruidCoordinator
return availableSegments;
}
public Iterable<DataSegment> getAvailableDataSegments()
private List<DataSegment> getAvailableDataSegments()
{
return Iterables.concat(
Iterables.transform(
metadataSegmentManager.getInventory(),
new Function<DruidDataSource, Iterable<DataSegment>>()
{
@Override
public Iterable<DataSegment> apply(DruidDataSource input)
{
return input.getSegments();
}
}
)
);
return metadataSegmentManager.getInventory()
.stream()
.flatMap(source -> source.getSegments().stream())
.collect(Collectors.toList());
}
@LifecycleStart

View File

@ -22,7 +22,7 @@ package io.druid.server.coordinator;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.java.util.common.DateTimes;
import io.druid.metadata.MetadataRuleManager;
import io.druid.timeline.DataSegment;
@ -30,8 +30,10 @@ import org.joda.time.DateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*/
@ -41,7 +43,7 @@ public class DruidCoordinatorRuntimeParams
private final DruidCluster druidCluster;
private final MetadataRuleManager databaseRuleManager;
private final SegmentReplicantLookup segmentReplicantLookup;
private final Set<DruidDataSource> dataSources;
private final Set<ImmutableDruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ReplicationThrottler replicationManager;
@ -56,7 +58,7 @@ public class DruidCoordinatorRuntimeParams
DruidCluster druidCluster,
MetadataRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Set<DruidDataSource> dataSources,
Set<ImmutableDruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
@ -102,7 +104,7 @@ public class DruidCoordinatorRuntimeParams
return segmentReplicantLookup;
}
public Set<DruidDataSource> getDataSources()
public Set<ImmutableDruidDataSource> getDataSources()
{
return dataSources;
}
@ -201,7 +203,7 @@ public class DruidCoordinatorRuntimeParams
private DruidCluster druidCluster;
private MetadataRuleManager databaseRuleManager;
private SegmentReplicantLookup segmentReplicantLookup;
private final Set<DruidDataSource> dataSources;
private final Set<ImmutableDruidDataSource> dataSources;
private final Set<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private ReplicationThrottler replicationManager;
@ -217,8 +219,8 @@ public class DruidCoordinatorRuntimeParams
this.druidCluster = null;
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR);
this.dataSources = new HashSet<>();
this.availableSegments = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR);
this.loadManagementPeons = Maps.newHashMap();
this.replicationManager = null;
this.emitter = null;
@ -232,7 +234,7 @@ public class DruidCoordinatorRuntimeParams
DruidCluster cluster,
MetadataRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Set<DruidDataSource> dataSources,
Set<ImmutableDruidDataSource> dataSources,
Set<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
@ -301,7 +303,7 @@ public class DruidCoordinatorRuntimeParams
return this;
}
public Builder withDatasources(Collection<DruidDataSource> dataSourcesCollection)
public Builder withDatasources(Collection<ImmutableDruidDataSource> dataSourcesCollection)
{
dataSources.addAll(Collections.unmodifiableCollection(dataSourcesCollection));
return this;

View File

@ -21,7 +21,6 @@ package io.druid.server.coordinator.helper;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidMetrics;
@ -259,7 +258,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
final Stream<DataSegment> allSegments = params
.getDataSources()
.stream()
.flatMap((final DruidDataSource dataSource) -> dataSource.getSegments().stream());
.flatMap(dataSource -> dataSource.getSegments().stream());
allSegments
.collect(Collectors.groupingBy(DataSegment::getDataSource))

View File

@ -29,6 +29,7 @@ import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.client.CoordinatorServerView;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.SegmentLoadInfo;
import io.druid.client.indexing.IndexingServiceClient;
@ -68,7 +69,9 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
*/
@ -108,33 +111,27 @@ public class DatasourcesResource
)
{
Response.ResponseBuilder builder = Response.ok();
final Set<DruidDataSource> datasources = InventoryViewUtils.getSecuredDataSources(
final Set<ImmutableDruidDataSource> datasources = InventoryViewUtils.getSecuredDataSources(
req,
serverInventoryView,
authorizerMapper
);
final Object entity;
if (full != null) {
return builder.entity(datasources).build();
entity = datasources;
} else if (simple != null) {
return builder.entity(
Lists.newArrayList(
Iterables.transform(
datasources,
(DruidDataSource dataSource) -> makeSimpleDatasource(dataSource)
)
)
).build();
entity = datasources.stream()
.map(this::makeSimpleDatasource)
.collect(Collectors.toList());
} else {
entity = datasources.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList());
}
return builder.entity(
Lists.newArrayList(
Iterables.transform(
datasources,
(DruidDataSource dataSource) -> dataSource.getName()
)
)
).build();
return builder.entity(entity).build();
}
@GET
@ -146,7 +143,7 @@ public class DatasourcesResource
@QueryParam("full") final String full
)
{
DruidDataSource dataSource = getDataSource(dataSourceName);
ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
if (dataSource == null) {
return Response.noContent().build();
@ -270,7 +267,7 @@ public class DatasourcesResource
@QueryParam("full") String full
)
{
final DruidDataSource dataSource = getDataSource(dataSourceName);
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
if (dataSource == null) {
return Response.noContent().build();
@ -335,7 +332,7 @@ public class DatasourcesResource
@QueryParam("full") String full
)
{
final DruidDataSource dataSource = getDataSource(dataSourceName);
final ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
final Interval theInterval = Intervals.of(interval.replace("_", "/"));
if (dataSource == null) {
@ -404,7 +401,7 @@ public class DatasourcesResource
@QueryParam("full") String full
)
{
DruidDataSource dataSource = getDataSource(dataSourceName);
ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
if (dataSource == null) {
return Response.noContent().build();
}
@ -431,7 +428,7 @@ public class DatasourcesResource
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = getDataSource(dataSourceName);
ImmutableDruidDataSource dataSource = getDataSource(dataSourceName);
if (dataSource == null) {
return Response.noContent().build();
}
@ -496,40 +493,34 @@ public class DatasourcesResource
return Response.ok(retVal).build();
}
private DruidDataSource getDataSource(final String dataSourceName)
@Nullable
private ImmutableDruidDataSource getDataSource(final String dataSourceName)
{
Iterable<DruidDataSource> dataSources =
Iterables.concat(
Iterables.transform(
serverInventoryView.getInventory(),
(DruidServer input) -> input.getDataSource(dataSourceName)
)
);
List<ImmutableDruidDataSource> dataSources = serverInventoryView
.getInventory()
.stream()
.map(server -> server.getDataSource(dataSourceName))
.filter(Objects::nonNull)
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList());
List<DruidDataSource> validDataSources = Lists.newArrayList();
for (DruidDataSource dataSource : dataSources) {
if (dataSource != null) {
validDataSources.add(dataSource);
}
}
if (validDataSources.isEmpty()) {
if (dataSources.isEmpty()) {
return null;
}
Map<String, DataSegment> segmentMap = Maps.newHashMap();
for (DruidDataSource dataSource : validDataSources) {
if (dataSource != null) {
Iterable<DataSegment> segments = dataSource.getSegments();
for (DataSegment segment : segments) {
segmentMap.put(segment.getIdentifier(), segment);
}
for (ImmutableDruidDataSource dataSource : dataSources) {
Iterable<DataSegment> segments = dataSource.getSegments();
for (DataSegment segment : segments) {
segmentMap.put(segment.getIdentifier(), segment);
}
}
return new DruidDataSource(
return new ImmutableDruidDataSource(
dataSourceName,
ImmutableMap.of()
).addSegments(segmentMap);
ImmutableMap.of(),
ImmutableMap.copyOf(segmentMap)
);
}
private Pair<DataSegment, Set<String>> getSegment(String segmentId)
@ -551,7 +542,7 @@ public class DatasourcesResource
return new Pair<>(theSegment, servers);
}
private Map<String, Object> makeSimpleDatasource(DruidDataSource input)
private Map<String, Object> makeSimpleDatasource(ImmutableDruidDataSource input)
{
return new ImmutableMap.Builder<String, Object>()
.put("name", input.getName())

View File

@ -21,7 +21,7 @@ package io.druid.server.http;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.InventoryView;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.MapUtils;
@ -70,14 +70,14 @@ public class IntervalsResource
public Response getIntervals(@Context final HttpServletRequest req)
{
final Comparator<Interval> comparator = Comparators.inverse(Comparators.intervalsByStartThenEnd());
final Set<DruidDataSource> datasources = InventoryViewUtils.getSecuredDataSources(
final Set<ImmutableDruidDataSource> datasources = InventoryViewUtils.getSecuredDataSources(
req,
serverInventoryView,
authorizerMapper
);
final Map<Interval, Map<String, Map<String, Object>>> retVal = Maps.newTreeMap(comparator);
for (DruidDataSource dataSource : datasources) {
for (ImmutableDruidDataSource dataSource : datasources) {
for (DataSegment dataSegment : dataSource.getSegments()) {
Map<String, Map<String, Object>> interval = retVal.get(dataSegment.getInterval());
if (interval == null) {
@ -102,7 +102,7 @@ public class IntervalsResource
)
{
final Interval theInterval = Intervals.of(interval.replace("_", "/"));
final Set<DruidDataSource> datasources = InventoryViewUtils.getSecuredDataSources(
final Set<ImmutableDruidDataSource> datasources = InventoryViewUtils.getSecuredDataSources(
req,
serverInventoryView,
authorizerMapper
@ -112,7 +112,7 @@ public class IntervalsResource
if (full != null) {
final Map<Interval, Map<String, Map<String, Object>>> retVal = Maps.newTreeMap(comparator);
for (DruidDataSource dataSource : datasources) {
for (ImmutableDruidDataSource dataSource : datasources) {
for (DataSegment dataSegment : dataSource.getSegments()) {
if (theInterval.contains(dataSegment.getInterval())) {
Map<String, Map<String, Object>> dataSourceInterval = retVal.get(dataSegment.getInterval());
@ -130,7 +130,7 @@ public class IntervalsResource
if (simple != null) {
final Map<Interval, Map<String, Object>> retVal = Maps.newHashMap();
for (DruidDataSource dataSource : datasources) {
for (ImmutableDruidDataSource dataSource : datasources) {
for (DataSegment dataSegment : dataSource.getSegments()) {
if (theInterval.contains(dataSegment.getInterval())) {
Map<String, Object> properties = retVal.get(dataSegment.getInterval());
@ -152,7 +152,7 @@ public class IntervalsResource
}
final Map<String, Object> retVal = Maps.newHashMap();
for (DruidDataSource dataSource : datasources) {
for (ImmutableDruidDataSource dataSource : datasources) {
for (DataSegment dataSegment : dataSource.getSegments()) {
if (theInterval.contains(dataSegment.getInterval())) {
retVal.put("size", MapUtils.getLong(retVal, "size", 0L) + dataSegment.getSize());
@ -166,7 +166,7 @@ public class IntervalsResource
private void setProperties(
final Map<Interval, Map<String, Map<String, Object>>> retVal,
DruidDataSource dataSource, DataSegment dataSegment
ImmutableDruidDataSource dataSource, DataSegment dataSegment
)
{
Map<String, Object> properties = retVal.get(dataSegment.getInterval()).get(dataSource.getName());

View File

@ -19,60 +19,41 @@
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.InventoryView;
import io.druid.java.util.common.ISE;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import javax.servlet.http.HttpServletRequest;
import java.util.Collections;
import java.util.Comparator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
public class InventoryViewUtils
public interface InventoryViewUtils
{
public static Set<DruidDataSource> getDataSources(InventoryView serverInventoryView)
static Comparator<ImmutableDruidDataSource> comparingByName()
{
TreeSet<DruidDataSource> dataSources = Sets.newTreeSet(
new Comparator<DruidDataSource>()
{
@Override
public int compare(DruidDataSource druidDataSource, DruidDataSource druidDataSource1)
{
return druidDataSource.getName().compareTo(druidDataSource1.getName());
}
}
);
dataSources.addAll(
Lists.newArrayList(
Iterables.concat(
Iterables.transform(
serverInventoryView.getInventory(),
new Function<DruidServer, Iterable<DruidDataSource>>()
{
@Override
public Iterable<DruidDataSource> apply(DruidServer input)
{
return input.getDataSources();
}
}
)
)
)
);
return dataSources;
return Comparator.comparing(ImmutableDruidDataSource::getName);
}
public static Set<DruidDataSource> getSecuredDataSources(
static SortedSet<ImmutableDruidDataSource> getDataSources(InventoryView serverInventoryView)
{
return serverInventoryView.getInventory()
.stream()
.flatMap(server -> server.getDataSources().stream())
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(
() -> new TreeSet<>(comparingByName()),
TreeSet::add,
TreeSet::addAll
);
}
static SortedSet<ImmutableDruidDataSource> getSecuredDataSources(
HttpServletRequest request,
InventoryView inventoryView,
final AuthorizerMapper authorizerMapper
@ -82,17 +63,16 @@ public class InventoryViewUtils
throw new ISE("No authorization mapper found");
}
return ImmutableSet.copyOf(
AuthorizationUtils.filterAuthorizedResources(
request,
getDataSources(inventoryView),
datasource -> {
return Lists.newArrayList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasource.getName())
);
},
authorizerMapper
)
Iterable<ImmutableDruidDataSource> filteredResources = AuthorizationUtils.filterAuthorizedResources(
request,
getDataSources(inventoryView),
datasource -> Lists.newArrayList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasource.getName())
),
authorizerMapper
);
SortedSet<ImmutableDruidDataSource> set = new TreeSet<>(comparingByName());
filteredResources.forEach(set::add);
return Collections.unmodifiableSortedSet(set);
}
}

View File

@ -20,20 +20,19 @@
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.http.security.DatasourceResourceFilter;
import io.druid.server.security.AuthConfig;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.ResourceAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -49,6 +48,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@ -85,22 +85,13 @@ public class MetadataResource
@Context final HttpServletRequest req
)
{
final Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getInventory();
final Set<String> dataSourceNamesPreAuth;
if (includeDisabled != null) {
dataSourceNamesPreAuth = Sets.newTreeSet(metadataSegmentManager.getAllDatasourceNames());
} else {
dataSourceNamesPreAuth = Sets.newTreeSet(
Iterables.transform(
metadataSegmentManager.getInventory(),
new Function<DruidDataSource, String>()
{
@Override
public String apply(DruidDataSource input)
{
return input.getName();
}
}
)
Iterables.transform(druidDataSources, ImmutableDruidDataSource::getName)
);
}
@ -123,17 +114,7 @@ public class MetadataResource
// Always use dataSourceNamesPostAuth to determine the set of returned dataSources
if (full != null && includeDisabled == null) {
return Response.ok().entity(
Collections2.filter(
metadataSegmentManager.getInventory(),
new Predicate<DruidDataSource>()
{
@Override
public boolean apply(DruidDataSource input)
{
return dataSourceNamesPostAuth.contains(input.getName());
}
}
)
Collections2.filter(druidDataSources, dataSource -> dataSourceNamesPostAuth.contains(dataSource.getName()))
).build();
} else {
return Response.ok().entity(dataSourceNamesPostAuth).build();
@ -148,7 +129,7 @@ public class MetadataResource
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
ImmutableDruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -165,7 +146,7 @@ public class MetadataResource
@QueryParam("full") String full
)
{
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
ImmutableDruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -237,7 +218,7 @@ public class MetadataResource
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
ImmutableDruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}

View File

@ -124,7 +124,7 @@ public class MetadataSegmentManagerTest
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
manager.getInventoryValue("wikipedia").getSegments()
ImmutableSet.copyOf(manager.getInventoryValue("wikipedia").getSegments())
);
}

View File

@ -380,7 +380,7 @@ public class ClientInfoResourceTest
.metrics(metrics)
.size(1)
.build();
server.addDataSegment(segment.getIdentifier(), segment);
server.addDataSegment(segment);
ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()));
timeline.add(Intervals.of(interval), version, new SingleElementPartitionChunk<ServerSelector>(ss));
}
@ -404,7 +404,7 @@ public class ClientInfoResourceTest
.shardSpec(shardSpec)
.size(1)
.build();
server.addDataSegment(segment.getIdentifier(), segment);
server.addDataSegment(segment);
ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()));
timeline.add(Intervals.of(interval), version, shardSpec.createChunk(ss));
}

View File

@ -75,15 +75,13 @@ public class DruidClusterTest
new ImmutableDruidDataSource(
"src1",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
ImmutableMap.of()
),
"src2",
new ImmutableDruidDataSource(
"src2",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
ImmutableMap.of()
)
);

View File

@ -363,7 +363,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment);
normServer.addDataSegment(availableSegment);
}
DruidCluster druidCluster = new DruidCluster(
@ -583,7 +583,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment segment : availableSegments) {
server.addDataSegment(segment.getIdentifier(), segment);
server.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
@ -653,7 +653,7 @@ public class DruidCoordinatorRuleRunnerTest
"normal",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
server1.addDataSegment(availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
@ -665,7 +665,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
server2.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
@ -742,7 +742,7 @@ public class DruidCoordinatorRuleRunnerTest
"hot",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
server1.addDataSegment(availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
@ -753,7 +753,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
server2.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
@ -841,7 +841,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
server2.addDataSegment(segment);
}
DruidCluster druidCluster = new DruidCluster(
null,
@ -913,7 +913,7 @@ public class DruidCoordinatorRuleRunnerTest
"normal",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
server1.addDataSegment(availableSegments.get(0));
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
@ -923,7 +923,7 @@ public class DruidCoordinatorRuleRunnerTest
"normal",
0
);
server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
server2.addDataSegment(availableSegments.get(1));
DruidServer server3 = new DruidServer(
"serverNorm3",
"hostNorm3",
@ -933,8 +933,8 @@ public class DruidCoordinatorRuleRunnerTest
"normal",
0
);
server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
server3.addDataSegment(availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2));
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
@ -1248,7 +1248,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server1.addDataSegment(availableSegment.getIdentifier(), availableSegment);
server1.addDataSegment(availableSegment);
}
DruidServer server2 = new DruidServer(
"serverNorm2",
@ -1260,7 +1260,7 @@ public class DruidCoordinatorRuleRunnerTest
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server2.addDataSegment(availableSegment.getIdentifier(), availableSegment);
server2.addDataSegment(availableSegment);
}
DruidCluster druidCluster = new DruidCluster(

View File

@ -30,12 +30,12 @@ import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.SingleServerInventoryView;
import io.druid.common.config.JacksonConfigManager;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
@ -237,7 +237,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(loadQueuePeon.getSegmentsToDrop()).andReturn(new HashSet<>()).once();
EasyMock.replay(loadQueuePeon);
DruidDataSource druidDataSource = EasyMock.createNiceMock(DruidDataSource.class);
ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyString())).andReturn(segment);
EasyMock.replay(druidDataSource);
EasyMock.expect(databaseSegmentManager.getInventoryValue(EasyMock.anyString())).andReturn(druidDataSource);
@ -275,7 +275,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
coordinator.moveSegment(
druidServer.toImmutableDruidServer(),
druidServer2.toImmutableDruidServer(),
segment, null
segment,
null
);
LoadPeonCallback loadCallback = loadCallbackCapture.getValue();
@ -322,11 +323,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
0x9,
0
);
druidDataSources[0].addSegment("0", dataSegment);
druidDataSources[0].addSegment(dataSegment);
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(druidDataSources[0])
ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.replay(databaseSegmentManager);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
@ -364,7 +365,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
if (assignSegmentLatch.getCount() > 0) {
//Coordinator should try to assign segment to druidServer historical
//Simulate historical loading segment
druidServer.addDataSegment(dataSegment.getIdentifier(), dataSegment);
druidServer.addDataSegment(dataSegment);
assignSegmentLatch.countDown();
} else {
Assert.fail("The same segment is assigned to the same server multiple times");
@ -431,11 +432,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
};
for (DataSegment segment : segments) {
dataSource.addSegment(segment.getIdentifier(), segment);
dataSource.addSegment(segment);
}
EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(dataSource)
ImmutableList.of(dataSource.toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.replay(databaseSegmentManager);
Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();

View File

@ -21,7 +21,6 @@ package io.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.Intervals;
@ -67,15 +66,13 @@ public class ServerHolderTest
new ImmutableDruidDataSource(
"src1",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
ImmutableMap.of()
),
"src2",
new ImmutableDruidDataSource(
"src2",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
ImmutableMap.of()
)
);

View File

@ -138,7 +138,7 @@ public class CachingCostBalancerStrategyTest
{
DruidServer druidServer = new DruidServer(name, host, null, maxSize, ServerType.HISTORICAL, "normal", 0);
createDataSegments(numberOfSegments, random, referenceTime)
.forEach(segment -> druidServer.addDataSegment(segment.getIdentifier(), segment));
.forEach(druidServer::addDataSegment);
return new ServerHolder(
druidServer.toImmutableDruidServer(),
new LoadQueuePeonTester()

View File

@ -114,7 +114,7 @@ public class BroadcastDistributionRuleTest
ServerType.HISTORICAL,
"hot",
0
).addDataSegment(smallSegment.getIdentifier(), smallSegment)
).addDataSegment(smallSegment)
.toImmutableDruidServer(),
new LoadQueuePeonTester()
);
@ -129,7 +129,7 @@ public class BroadcastDistributionRuleTest
ServerType.HISTORICAL,
"hot",
0
).addDataSegment(largeSegments.get(0).getIdentifier(), largeSegments.get(0))
).addDataSegment(largeSegments.get(0))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
)
@ -144,7 +144,7 @@ public class BroadcastDistributionRuleTest
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
).addDataSegment(largeSegments.get(1).getIdentifier(), largeSegments.get(1))
).addDataSegment(largeSegments.get(1))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
)
@ -159,7 +159,7 @@ public class BroadcastDistributionRuleTest
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
).addDataSegment(largeSegments.get(2).getIdentifier(), largeSegments.get(2))
).addDataSegment(largeSegments.get(2))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
)
@ -175,7 +175,7 @@ public class BroadcastDistributionRuleTest
ServerType.HISTORICAL,
"hot",
0
).addDataSegment(largeSegments2.get(0).getIdentifier(), largeSegments2.get(0))
).addDataSegment(largeSegments2.get(0))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
)
@ -190,7 +190,7 @@ public class BroadcastDistributionRuleTest
ServerType.HISTORICAL,
DruidServer.DEFAULT_TIER,
0
).addDataSegment(largeSegments2.get(1).getIdentifier(), largeSegments2.get(1))
).addDataSegment(largeSegments2.get(1))
.toImmutableDruidServer(),
new LoadQueuePeonTester()
)

View File

@ -303,7 +303,7 @@ public class LoadRuleTest
"hot",
0
);
server1.addDataSegment(segment.getIdentifier(), segment);
server1.addDataSegment(segment);
DruidServer server2 = new DruidServer(
"serverNorm",
"hostNorm",
@ -313,7 +313,7 @@ public class LoadRuleTest
DruidServer.DEFAULT_TIER,
0
);
server2.addDataSegment(segment.getIdentifier(), segment);
server2.addDataSegment(segment);
DruidServer server3 = new DruidServer(
"serverNormNotServing",
"hostNorm",
@ -457,8 +457,8 @@ public class LoadRuleTest
"hot",
0
);
server1.addDataSegment(segment.getIdentifier(), segment);
server2.addDataSegment(segment.getIdentifier(), segment);
server1.addDataSegment(segment);
server2.addDataSegment(segment);
DruidCluster druidCluster = new DruidCluster(
null,

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet;
import io.druid.client.CoordinatorServerView;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.java.util.common.Intervals;
import io.druid.server.coordination.ServerType;
@ -46,11 +47,13 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
public class DatasourcesResourceTest
{
@ -65,7 +68,7 @@ public class DatasourcesResourceTest
{
request = EasyMock.createStrictMock(HttpServletRequest.class);
inventoryView = EasyMock.createStrictMock(CoordinatorServerView.class);
server = EasyMock.createStrictMock(DruidServer.class);
server = EasyMock.niceMock(DruidServer.class);
dataSegmentList = new ArrayList<>();
dataSegmentList.add(
new DataSegment(
@ -108,10 +111,10 @@ public class DatasourcesResourceTest
);
listDataSources = new ArrayList<>();
listDataSources.add(
new DruidDataSource("datasource1", new HashMap()).addSegment("part1", dataSegmentList.get(0))
new DruidDataSource("datasource1", new HashMap<>()).addSegment(dataSegmentList.get(0))
);
listDataSources.add(
new DruidDataSource("datasource2", new HashMap()).addSegment("part1", dataSegmentList.get(1))
new DruidDataSource("datasource2", new HashMap<>()).addSegment(dataSegmentList.get(1))
);
}
@ -155,12 +158,13 @@ public class DatasourcesResourceTest
AuthTestUtils.TEST_AUTHORIZER_MAPPER
);
Response response = datasourcesResource.getQueryableDataSources("full", null, request);
Set<DruidDataSource> result = (Set<DruidDataSource>) response.getEntity();
DruidDataSource[] resultantDruidDataSources = new DruidDataSource[result.size()];
result.toArray(resultantDruidDataSources);
Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) response.getEntity();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(2, resultantDruidDataSources.length);
Assert.assertArrayEquals(listDataSources.toArray(), resultantDruidDataSources);
Assert.assertEquals(2, result.size());
Assert.assertEquals(
listDataSources.stream().map(DruidDataSource::toImmutableDruidDataSource).collect(Collectors.toSet()),
new HashSet<>(result)
);
response = datasourcesResource.getQueryableDataSources(null, null, request);
List<String> result1 = (List<String>) response.getEntity();
@ -236,13 +240,16 @@ public class DatasourcesResourceTest
authMapper
);
Response response = datasourcesResource.getQueryableDataSources("full", null, request);
Set<DruidDataSource> result = (Set<DruidDataSource>) response.getEntity();
DruidDataSource[] resultantDruidDataSources = new DruidDataSource[result.size()];
result.toArray(resultantDruidDataSources);
Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) response.getEntity();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(1, resultantDruidDataSources.length);
Assert.assertArrayEquals(listDataSources.subList(0, 1).toArray(), resultantDruidDataSources);
Assert.assertEquals(1, result.size());
Assert.assertEquals(
listDataSources.subList(0, 1).stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toSet()),
new HashSet<>(result)
);
response = datasourcesResource.getQueryableDataSources(null, null, request);
List<String> result1 = (List<String>) response.getEntity();
@ -267,7 +274,6 @@ public class DatasourcesResourceTest
EasyMock.expect(server.getDataSource("datasource2")).andReturn(
listDataSources.get(1)
).atLeastOnce();
EasyMock.expect(server.getTier()).andReturn(null).atLeastOnce();
EasyMock.expect(inventoryView.getInventory()).andReturn(
ImmutableList.of(server)
).atLeastOnce();
@ -314,9 +320,9 @@ public class DatasourcesResourceTest
EasyMock.replay(inventoryView, server);
DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null, new AuthConfig(), null);
Response response = datasourcesResource.getTheDataSource("datasource1", "full");
DruidDataSource result = (DruidDataSource) response.getEntity();
ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(dataSource1, result);
Assert.assertEquals(dataSource1.toImmutableDruidDataSource(), result);
EasyMock.verify(inventoryView, server);
}
@ -337,9 +343,8 @@ public class DatasourcesResourceTest
@Test
public void testSimpleGetTheDataSource() throws Exception
{
DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap());
DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap<>());
dataSource1.addSegment(
"partition",
new DataSegment("datasegment1", Intervals.of("2010-01-01/P1D"), null, null, null, null, null, 0x9, 10)
);
EasyMock.expect(server.getDataSource("datasource1")).andReturn(
@ -410,9 +415,9 @@ public class DatasourcesResourceTest
public void testGetSegmentDataSourceIntervals()
{
server = new DruidServer("who", "host", null, 1234, ServerType.HISTORICAL, "tier1", 0);
server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0));
server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1));
server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2));
server.addDataSegment(dataSegmentList.get(0));
server.addDataSegment(dataSegmentList.get(1));
server.addDataSegment(dataSegmentList.get(2));
EasyMock.expect(inventoryView.getInventory()).andReturn(
ImmutableList.of(server)
).atLeastOnce();
@ -460,9 +465,9 @@ public class DatasourcesResourceTest
public void testGetSegmentDataSourceSpecificInterval()
{
server = new DruidServer("who", "host", null, 1234, ServerType.HISTORICAL, "tier1", 0);
server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0));
server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1));
server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2));
server.addDataSegment(dataSegmentList.get(0));
server.addDataSegment(dataSegmentList.get(1));
server.addDataSegment(dataSegmentList.get(2));
EasyMock.expect(inventoryView.getInventory()).andReturn(
ImmutableList.of(server)
).atLeastOnce();

View File

@ -97,9 +97,9 @@ public class IntervalsResourceTest
)
);
server = new DruidServer("who", "host", null, 1234, ServerType.HISTORICAL, "tier1", 0);
server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0));
server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1));
server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2));
server.addDataSegment(dataSegmentList.get(0));
server.addDataSegment(dataSegmentList.get(1));
server.addDataSegment(dataSegmentList.get(2));
}
@Test

View File

@ -51,7 +51,7 @@ public class ServersResourceTest
.version("v0")
.size(1L)
.build();
dummyServer.addDataSegment(segment.getIdentifier(), segment);
dummyServer.addDataSegment(segment);
CoordinatorServerView inventoryView = EasyMock.createMock(CoordinatorServerView.class);
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(dummyServer)).anyTimes();

View File

@ -44,6 +44,7 @@ import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -119,7 +120,7 @@ public class CliRealtimeExample extends ServerRunnable
}
@Override
public Iterable<DruidServer> getInventory()
public Collection<DruidServer> getInventory()
{
return ImmutableList.of();
}