Merge pull request #2056 from guobingkun/fix_server_selector2

Bug fix: stale segment in ServerSelector
This commit is contained in:
Fangjin Yang 2015-12-07 16:57:33 -08:00
commit 56fd48c0e4
6 changed files with 109 additions and 13 deletions

View File

@ -210,7 +210,7 @@ public class BrokerServerView implements TimelineServerView
if (queryableDruidServer == null) {
queryableDruidServer = addServer(baseView.getInventoryValue(server.getName()));
}
selector.addServer(queryableDruidServer);
selector.addServerAndUpdateSegment(queryableDruidServer, segment);
}
}

View File

@ -25,6 +25,7 @@ import io.druid.timeline.DataSegment;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ -35,28 +36,30 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment;
private final TierSelectorStrategy strategy;
private final AtomicReference<DataSegment> segment;
public ServerSelector(
DataSegment segment,
TierSelectorStrategy strategy
)
{
this.segment = segment;
this.segment = new AtomicReference<DataSegment>(segment);
this.strategy = strategy;
}
public DataSegment getSegment()
{
return segment;
return segment.get();
}
public void addServer(
QueryableDruidServer server
public void addServerAndUpdateSegment(
QueryableDruidServer server, DataSegment segment
)
{
synchronized (this) {
this.segment.set(segment);
servers.add(server);
}
}
@ -88,7 +91,7 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
theServers.add(server);
}
return strategy.pick(prioritizedServers, segment);
return strategy.pick(prioritizedServers, segment.get());
}
}
}

View File

@ -558,7 +558,7 @@ public class CachingClusteredClientTest
dataSegment,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
selector.addServer(new QueryableDruidServer(lastServer, null));
selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment);
timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector));
client.run(query, context);
@ -1714,7 +1714,7 @@ public class CachingClusteredClientTest
expectation.getSegment(),
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
selector.addServer(new QueryableDruidServer(lastServer, null));
selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), selector.getSegment());
final PartitionChunk<ServerSelector> chunk;
if (numChunks == 1) {

View File

@ -136,12 +136,12 @@ public class DirectDruidClientTest
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client2
);
serverSelector.addServer(queryableDruidServer2);
serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();
@ -238,7 +238,7 @@ public class DirectDruidClientTest
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();

View File

@ -0,0 +1,93 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.selector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class ServerSelectorTest
{
@Test
public void testSegmentUpdate() throws Exception
{
final ServerSelector selector = new ServerSelector(
DataSegment.builder()
.dataSource("test_broker_server_view")
.interval(new Interval("2012/2013"))
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
"somewhere"
)
)
.version("v1")
.dimensions(ImmutableList.<String>of())
.metrics(ImmutableList.<String>of())
.shardSpec(new NoneShardSpec())
.binaryVersion(9)
.size(0)
.build(),
EasyMock.createMock(TierSelectorStrategy.class)
);
selector.addServerAndUpdateSegment(
EasyMock.createMock(QueryableDruidServer.class),
DataSegment.builder()
.dataSource(
"test_broker_server_view")
.interval(new Interval(
"2012/2013"))
.loadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
"somewhere"
)
)
.version("v1")
.dimensions(
ImmutableList.<String>of(
"a",
"b",
"c"
))
.metrics(
ImmutableList.<String>of())
.shardSpec(new NoneShardSpec())
.binaryVersion(9)
.size(0)
.build()
);
Assert.assertEquals(ImmutableList.of("a", "b", "c"), selector.getSegment().getDimensions());
}
}

View File

@ -130,7 +130,7 @@ public class TierSelectorStrategyTest
tierSelectorStrategy
);
for (QueryableDruidServer server : servers) {
serverSelector.addServer(server);
serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment());
}
Assert.assertEquals(expectedSelection, serverSelector.pick());
}