fixes HttpServerInventoryView to call server/segment callbacks correctly and Unit Tests for the class (#4767)

* fixes HttpServerInventoryView to call server/segment callbacks correctly and Unit Tests for the class

* fix checkstyle and forbidden-api errors

* HttpServerInventoryView to finish start() only after server inventory is initialized

* fix compilation errors

* address review comments

* add exponential backoff instead of fixed 5 secs on successive failures

* update test to exercise server fail scenarios

* use AtomicInteger for requestNum and increment only once
This commit is contained in:
Himanshu 2017-09-13 14:24:19 -05:00 committed by GitHub
parent 39a0b171e8
commit 7919469de6
7 changed files with 518 additions and 151 deletions

View File

@ -81,18 +81,27 @@ public class RetryUtils
return retry(f, shouldRetry, 0, maxTries);
}
private static void awaitNextRetry(final Throwable e, final int nTry, final boolean quiet) throws InterruptedException
private static void awaitNextRetry(Throwable e, final int nTry, final boolean quiet) throws InterruptedException
{
final long sleepMillis = nextRetrySleepMillis(nTry);
if (quiet) {
log.debug(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis);
} else {
log.warn(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis);
}
Thread.sleep(sleepMillis);
}
public static long nextRetrySleepMillis(final int nTry) throws InterruptedException
{
final long baseSleepMillis = 1000;
final long maxSleepMillis = 60000;
final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2);
final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry - 1))
* fuzzyMultiplier);
if (quiet) {
log.debug(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis);
} else {
log.warn(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis);
}
Thread.sleep(sleepMillis);
return sleepMillis;
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.base.Predicates;
import com.metamx.http.client.HttpClient;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
@ -47,11 +46,6 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
@Smile
ObjectMapper smileMapper = null;
@JacksonInject
@NotNull
@Json
ObjectMapper jsonMapper = null;
@JacksonInject
@NotNull
HttpServerInventoryViewConfig config = null;
@ -64,7 +58,7 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn
public HttpServerInventoryView get()
{
return new HttpServerInventoryView(
jsonMapper, smileMapper, httpClient,
smileMapper, httpClient,
druidNodeDiscoveryProvider,
Predicates.<Pair<DruidServerMetadata, DataSegment>>alwaysTrue(),
config

View File

@ -24,9 +24,9 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -45,18 +45,18 @@ import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestHistory;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import io.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpHeaders;
@ -77,9 +77,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class uses CuratorInventoryManager to listen for queryable server membership which serve segments(e.g. Historicals).
@ -114,9 +114,10 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private final ObjectMapper smileMapper;
private final HttpServerInventoryViewConfig config;
private final CountDownLatch inventoryInitializationLatch = new CountDownLatch(1);
@Inject
public HttpServerInventoryView(
final @Json ObjectMapper jsonMapper,
final @Smile ObjectMapper smileMapper,
final @Global HttpClient httpClient,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
@ -182,17 +183,16 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
private volatile boolean initialized = false;
private final AtomicBoolean initialized = new AtomicBoolean(false);
@Override
public void nodesAdded(List<DiscoveryDruidNode> nodes)
{
nodes.forEach(
node -> serverAddedOrUpdated(toDruidServer(node))
node -> serverAdded(toDruidServer(node))
);
if (!initialized) {
initialized = true;
if (!initialized.getAndSet(true)) {
queue.add(HttpServerInventoryView.this::serverInventoryInitialized);
}
}
@ -221,12 +221,19 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
);
log.info("Started HttpServerInventoryView.");
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
log.info("Waiting for Server Inventory Initialization...");
while (!inventoryInitializationLatch.await(1, TimeUnit.MINUTES)) {
log.info("Still waiting for Server Inventory Initialization...");
}
log.info("Started HttpServerInventoryView.");
}
}
@ -256,8 +263,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
Executor exec, SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
segmentCallbacks.put(callback, exec);
segmentPredicates.put(callback, filter);
SegmentCallback filteringSegmentCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter);
segmentCallbacks.put(filteringSegmentCallback, exec);
segmentPredicates.put(filteringSegmentCallback, filter);
finalPredicate = Predicates.or(
defaultFilter,
@ -354,6 +362,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
server.awaitInitialization();
}
inventoryInitializationLatch.countDown();
log.info("Calling SegmentCallback.segmentViewInitialized() for all callbacks.");
runSegmentCallbacks(
@ -368,19 +378,28 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
);
}
private DruidServer serverAddedOrUpdated(DruidServer server)
private void serverAdded(DruidServer server)
{
DruidServerHolder newHolder = servers.compute(
DruidServerHolder holder = servers.computeIfAbsent(
server.getName(),
(k, v) -> v == null ? new DruidServerHolder(server) : v.updatedHolder(server)
k -> new DruidServerHolder(server)
);
newHolder.updateSegmentsListAsync();
return newHolder.druidServer;
if (holder.druidServer == server) {
holder.updateSegmentsListAsync();
} else {
log.info("Server[%s] already exists.", server.getName());
}
}
private void serverRemoved(DruidServer server)
{
servers.remove(server.getName());
DruidServerHolder holder = servers.remove(server.getName());
if (holder != null) {
runServerCallbacks(holder.druidServer);
} else {
log.info("Server[%s] did not exist. Removal notification ignored.", server.getName());
}
}
@Override
@ -401,73 +420,23 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private final Object lock = new Object();
//lock is used to keep state in counter and and segment list in druidServer consistent
// so that in "updateHolder()" method, new DruidServerHolder with updated DruidServer info
// can be safely created
private final DruidServer druidServer;
private volatile SegmentChangeRequestHistory.Counter counter = null;
private final HostAndPort serverHostAndPort;
private final DataSegmentChangeHandler changeHandler;
private final long serverHttpTimeout = config.getServerTimeout() + 1000;
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private volatile boolean isUnstable = false;
private volatile long unstableStartTime = -1;
private volatile int consecutiveFailedAttemptCount = 0;
DruidServerHolder(DruidServer druidServer)
{
this(druidServer, null);
}
private DruidServerHolder(final DruidServer druidServer, final SegmentChangeRequestHistory.Counter counter)
{
this.druidServer = druidServer;
this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost());
this.counter = counter;
changeHandler = new DataSegmentChangeHandler()
{
@Override
public void addSegment(
final DataSegment segment, final DataSegmentChangeCallback callback
)
{
if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) {
druidServer.addDataSegment(segment.getIdentifier(), segment);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(druidServer.getMetadata(), segment);
}
}
);
}
}
@Override
public void removeSegment(
final DataSegment segment, final DataSegmentChangeCallback callback
)
{
druidServer.removeDataSegment(segment.getIdentifier());
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(druidServer.getMetadata(), segment);
}
}
);
}
};
}
//wait for first fetch of segment listing from server.
@ -484,14 +453,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
DruidServerHolder updatedHolder(DruidServer server)
{
synchronized (lock) {
return new DruidServerHolder(server.addDataSegments(druidServer), counter);
}
}
Future<?> updateSegmentsListAsync()
void updateSegmentsListAsync()
{
try {
final String req;
@ -554,7 +516,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
synchronized (lock) {
if (delta.isResetCounter()) {
log.debug(
log.info(
"Server [%s] requested resetCounter for reason [%s].",
druidServer.getName(),
delta.getResetCause()
@ -564,17 +526,49 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
if (counter == null) {
druidServer.removeAllSegments();
// means, on last request either server had asked us to reset the counter or it was very first
// request to the server.
Map<String, DataSegment> toRemove = Maps.newHashMap(druidServer.getSegments());
for (DataSegmentChangeRequest request : delta.getRequests()) {
if (request instanceof SegmentChangeRequestLoad) {
DataSegment segment = ((SegmentChangeRequestLoad) request).getSegment();
toRemove.remove(segment.getIdentifier());
addSegment(segment);
} else {
log.error(
"Server[%s] gave a non-load dataSegmentChangeRequest[%s]., Ignored.",
druidServer.getName(),
request
);
}
}
for (DataSegment segmentToRemove : toRemove.values()) {
removeSegment(segmentToRemove);
}
} else {
for (DataSegmentChangeRequest request : delta.getRequests()) {
if (request instanceof SegmentChangeRequestLoad) {
addSegment(((SegmentChangeRequestLoad) request).getSegment());
} else if (request instanceof SegmentChangeRequestDrop) {
removeSegment(((SegmentChangeRequestDrop) request).getSegment());
} else {
log.error(
"Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.",
druidServer.getName(),
request
);
}
}
}
for (DataSegmentChangeRequest request : delta.getRequests()) {
request.go(changeHandler, null);
}
counter = delta.getCounter();
}
initializationLatch.countDown();
isUnstable = false;
consecutiveFailedAttemptCount = 0;
}
catch (Exception ex) {
log.error(ex, "error processing segment list response from server [%s]", druidServer.getName());
@ -595,7 +589,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
responseHandler.description
);
if (hasUnstabilityTimeoutPassed()) {
if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
if (t != null) {
log.error(t, logMsg);
} else {
@ -612,7 +606,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(5000);
Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount));
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@ -625,32 +619,80 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
},
executor
);
return future;
}
catch (Throwable th) {
addNextSyncToWorkQueue(druidServer.getName());
String logMsg = StringUtils.nonStrictFormat(
"Fatal error while fetching segment list from server [%s].", druidServer.getName()
);
if (hasUnstabilityTimeoutPassed()) {
log.makeAlert(th, logMsg).emit();
} else {
log.info("Temporary Failure. %s", logMsg);
log.debug(th, logMsg);
}
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(5000);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
String logMsg = StringUtils.nonStrictFormat(
"Fatal error while fetching segment list from server [%s].", druidServer.getName()
);
throw Throwables.propagate(th);
if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
log.makeAlert(th, logMsg).emit();
} else {
log.info("Temporary Failure. %s", logMsg);
log.debug(th, logMsg);
}
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount));
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
finally {
addNextSyncToWorkQueue(druidServer.getName());
}
}
}
private void addSegment(final DataSegment segment)
{
if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) {
if (druidServer.getSegment(segment.getIdentifier()) == null) {
druidServer.addDataSegment(segment.getIdentifier(), segment);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(druidServer.getMetadata(), segment);
}
}
);
} else {
log.warn(
"Not adding or running callbacks for existing segment[%s] on server[%s]",
segment.getIdentifier(),
druidServer.getName()
);
}
}
}
private void removeSegment(final DataSegment segment)
{
if (druidServer.getSegment(segment.getIdentifier()) != null) {
druidServer.removeDataSegment(segment.getIdentifier());
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(druidServer.getMetadata(), segment);
}
}
);
} else {
log.warn(
"Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
segment.getIdentifier(),
druidServer.getName()
);
}
}
@ -666,14 +708,14 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
);
}
private boolean hasUnstabilityTimeoutPassed()
private boolean incrementFailedAttemptAndCheckUnstabilityTimeout()
{
if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) {
if (consecutiveFailedAttemptCount > 0
&& (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) {
return true;
}
if (!isUnstable) {
isUnstable = true;
if (consecutiveFailedAttemptCount++ == 0) {
unstableStartTime = System.currentTimeMillis();
}

View File

@ -25,7 +25,6 @@ import com.google.common.base.Predicates;
import com.metamx.http.client.HttpClient;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Pair;
import io.druid.server.coordination.DruidServerMetadata;
@ -47,11 +46,6 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi
@Smile
ObjectMapper smileMapper = null;
@JacksonInject
@NotNull
@Json
ObjectMapper jsonMapper = null;
@JacksonInject
@NotNull
HttpServerInventoryViewConfig config = null;
@ -64,7 +58,6 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi
public HttpServerInventoryView get()
{
return new HttpServerInventoryView(
jsonMapper,
smileMapper,
httpClient,
druidNodeDiscoveryProvider,

View File

@ -20,16 +20,11 @@
package io.druid.server.coordination.broker;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.client.FilteredServerInventoryView;
import io.druid.client.ServerView;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.java.util.common.Pair;
@ -44,8 +39,6 @@ public class DruidBroker
{
private final DruidNode self;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DiscoveryDruidNode discoveryDruidNode;
private volatile boolean started = false;
@ -53,19 +46,11 @@ public class DruidBroker
public DruidBroker(
final FilteredServerInventoryView serverInventoryView,
final @Self DruidNode self,
final ServiceAnnouncer serviceAnnouncer,
final DruidNodeAnnouncer druidNodeAnnouncer,
final LookupNodeService lookupNodeService
final ServiceAnnouncer serviceAnnouncer
)
{
this.self = self;
this.serviceAnnouncer = serviceAnnouncer;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.discoveryDruidNode = new DiscoveryDruidNode(
self,
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of(lookupNodeService.getName(), lookupNodeService)
);
serverInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
@ -75,7 +60,6 @@ public class DruidBroker
public ServerView.CallbackAction segmentViewInitialized()
{
serviceAnnouncer.announce(self);
druidNodeAnnouncer.announce(discoveryDruidNode);
return ServerView.CallbackAction.UNREGISTER;
}
},
@ -103,7 +87,6 @@ public class DruidBroker
return;
}
serviceAnnouncer.unannounce(self);
druidNodeAnnouncer.unannounce(discoveryDruidNode);
started = false;
}
}

View File

@ -0,0 +1,335 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.RE;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestHistory;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ServerConfig;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class HttpServerInventoryViewTest
{
@Test(timeout = 10000)
public void testSimple() throws Exception
{
ObjectMapper jsonMapper = new DefaultObjectMapper();
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
final DataSegment segment1 = new DataSegment(
"test1", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
final DataSegment segment2 = new DataSegment(
"test2", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
final DataSegment segment3 = new DataSegment(
"test3", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
final DataSegment segment4 = new DataSegment(
"test4", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
TestHttpClient httpClient = new TestHttpClient(
ImmutableList.of(
Futures.immediateFuture(
new ByteArrayInputStream(
jsonMapper.writeValueAsBytes(
new SegmentChangeRequestsSnapshot(
false,
null,
SegmentChangeRequestHistory.Counter.ZERO,
ImmutableList.of(
new SegmentChangeRequestLoad(segment1)
)
)
)
)
),
Futures.immediateFuture(
new ByteArrayInputStream(
jsonMapper.writeValueAsBytes(
new SegmentChangeRequestsSnapshot(
false,
null,
SegmentChangeRequestHistory.Counter.ZERO,
ImmutableList.of(
new SegmentChangeRequestDrop(segment1),
new SegmentChangeRequestLoad(segment2),
new SegmentChangeRequestLoad(segment3)
)
)
)
)
),
Futures.immediateFuture(
new ByteArrayInputStream(
jsonMapper.writeValueAsBytes(
new SegmentChangeRequestsSnapshot(
true,
"force reset counter",
SegmentChangeRequestHistory.Counter.ZERO,
ImmutableList.of()
)
)
)
),
Futures.immediateFuture(
new ByteArrayInputStream(
jsonMapper.writeValueAsBytes(
new SegmentChangeRequestsSnapshot(
false,
null,
SegmentChangeRequestHistory.Counter.ZERO,
ImmutableList.of(
new SegmentChangeRequestLoad(segment3),
new SegmentChangeRequestLoad(segment4)
)
)
)
)
)
)
);
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
);
HttpServerInventoryView httpServerInventoryView = new HttpServerInventoryView(
jsonMapper,
httpClient,
druidNodeDiscoveryProvider,
Predicates.alwaysTrue(),
new HttpServerInventoryViewConfig(null, null, null)
);
CountDownLatch initializeCallback1 = new CountDownLatch(1);
Map<String, CountDownLatch> segmentAddLathces = ImmutableMap.of(
segment1.getIdentifier(), new CountDownLatch(1),
segment2.getIdentifier(), new CountDownLatch(1),
segment3.getIdentifier(), new CountDownLatch(1),
segment4.getIdentifier(), new CountDownLatch(1)
);
Map<String, CountDownLatch> segmentDropLatches = ImmutableMap.of(
segment1.getIdentifier(), new CountDownLatch(1),
segment2.getIdentifier(), new CountDownLatch(1)
);
httpServerInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.SegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServerMetadata server, DataSegment segment
)
{
segmentAddLathces.get(segment.getIdentifier()).countDown();
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(
DruidServerMetadata server, DataSegment segment
)
{
segmentDropLatches.get(segment.getIdentifier()).countDown();
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
initializeCallback1.countDown();
return ServerView.CallbackAction.CONTINUE;
}
}
);
final CountDownLatch serverRemovedCalled = new CountDownLatch(1);
httpServerInventoryView.registerServerRemovedCallback(
MoreExecutors.sameThreadExecutor(),
new ServerView.ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
if (server.getName().equals("host:8080")) {
serverRemovedCalled.countDown();
return ServerView.CallbackAction.CONTINUE;
} else {
throw new RE("Unknown server [%s]", server.getName());
}
}
}
);
httpServerInventoryView.start();
druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode));
initializeCallback1.await();
segmentAddLathces.get(segment1.getIdentifier()).await();
segmentDropLatches.get(segment1.getIdentifier()).await();
segmentAddLathces.get(segment2.getIdentifier()).await();
segmentAddLathces.get(segment3.getIdentifier()).await();
segmentAddLathces.get(segment4.getIdentifier()).await();
segmentDropLatches.get(segment2.getIdentifier()).await();
DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080");
Assert.assertEquals(ImmutableMap.of(segment3.getIdentifier(), segment3, segment4.getIdentifier(), segment4),
druidServer.getSegments());
druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));
serverRemovedCalled.await();
Assert.assertNull(httpServerInventoryView.getInventoryValue("host:8080"));
EasyMock.verify(druidNodeDiscoveryProvider);
httpServerInventoryView.stop();
}
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
Listener listener;
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
throw new UnsupportedOperationException("Not Implemented.");
}
@Override
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
this.listener = listener;
}
}
private static class TestHttpClient implements HttpClient
{
BlockingQueue<ListenableFuture> results;
AtomicInteger requestNum = new AtomicInteger(0);
TestHttpClient(List<ListenableFuture> resultsList)
{
results = new LinkedBlockingQueue<>();
results.addAll(resultsList);
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler
)
{
throw new UnsupportedOperationException("Not Implemented.");
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, Duration duration
)
{
if (requestNum.getAndIncrement() == 0) {
//fail first request immediately
throw new RuntimeException("simulating couldn't send request to server for some reason.");
}
if (requestNum.get() == 2) {
//fail scenario where request is sent to server but we got an unexpected response.
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
httpResponse.setContent(ChannelBuffers.buffer(0));
httpResponseHandler.handleResponse(httpResponse);
return Futures.immediateFailedFuture(new RuntimeException("server error"));
}
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.setContent(ChannelBuffers.buffer(0));
httpResponseHandler.handleResponse(httpResponse);
try {
return results.take();
}
catch (InterruptedException ex) {
throw new RE(ex, "Interrupted.");
}
}
}
}

View File

@ -21,6 +21,7 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
@ -33,6 +34,8 @@ import io.druid.client.cache.CacheMonitor;
import io.druid.client.selector.CustomTierSelectorStrategyConfig;
import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.client.selector.TierSelectorStrategy;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.CacheModule;
import io.druid.guice.DruidProcessingModule;
import io.druid.guice.Jerseys;
@ -120,6 +123,14 @@ public class CliBroker extends ServerRunnable
MetricsModule.register(binder, CacheMonitor.class);
LifecycleModule.register(binder, Server.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableList.of(LookupNodeService.class)
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
},
new LookupModule(),