diff --git a/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java b/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java index ac7e34d8065..7b0e019283b 100644 --- a/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java @@ -81,18 +81,27 @@ public class RetryUtils return retry(f, shouldRetry, 0, maxTries); } - private static void awaitNextRetry(final Throwable e, final int nTry, final boolean quiet) throws InterruptedException + private static void awaitNextRetry(Throwable e, final int nTry, final boolean quiet) throws InterruptedException + { + + final long sleepMillis = nextRetrySleepMillis(nTry); + + if (quiet) { + log.debug(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis); + } else { + log.warn(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis); + } + + Thread.sleep(sleepMillis); + } + + public static long nextRetrySleepMillis(final int nTry) throws InterruptedException { final long baseSleepMillis = 1000; final long maxSleepMillis = 60000; final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2); final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry - 1)) * fuzzyMultiplier); - if (quiet) { - log.debug(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis); - } else { - log.warn(e, "Failed on try %d, retrying in %,dms.", nTry, sleepMillis); - } - Thread.sleep(sleepMillis); + return sleepMillis; } } diff --git a/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java index 2741739ce42..b779ea9b2e4 100644 --- a/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java @@ -25,7 +25,6 @@ import com.google.common.base.Predicates; import com.metamx.http.client.HttpClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Client; -import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Pair; import io.druid.server.coordination.DruidServerMetadata; @@ -47,11 +46,6 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn @Smile ObjectMapper smileMapper = null; - @JacksonInject - @NotNull - @Json - ObjectMapper jsonMapper = null; - @JacksonInject @NotNull HttpServerInventoryViewConfig config = null; @@ -64,7 +58,7 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn public HttpServerInventoryView get() { return new HttpServerInventoryView( - jsonMapper, smileMapper, httpClient, + smileMapper, httpClient, druidNodeDiscoveryProvider, Predicates.>alwaysTrue(), config diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index ab72b916f6e..d19ca3bb776 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -24,9 +24,9 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; +import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -45,18 +45,18 @@ import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscovery; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; -import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; -import io.druid.server.coordination.DataSegmentChangeCallback; -import io.druid.server.coordination.DataSegmentChangeHandler; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.SegmentChangeRequestDrop; import io.druid.server.coordination.SegmentChangeRequestHistory; +import io.druid.server.coordination.SegmentChangeRequestLoad; import io.druid.server.coordination.SegmentChangeRequestsSnapshot; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpHeaders; @@ -77,9 +77,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class uses CuratorInventoryManager to listen for queryable server membership which serve segments(e.g. Historicals). @@ -114,9 +114,10 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final ObjectMapper smileMapper; private final HttpServerInventoryViewConfig config; + private final CountDownLatch inventoryInitializationLatch = new CountDownLatch(1); + @Inject public HttpServerInventoryView( - final @Json ObjectMapper jsonMapper, final @Smile ObjectMapper smileMapper, final @Global HttpClient httpClient, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, @@ -182,17 +183,16 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer druidNodeDiscovery.registerListener( new DruidNodeDiscovery.Listener() { - private volatile boolean initialized = false; + private final AtomicBoolean initialized = new AtomicBoolean(false); @Override public void nodesAdded(List nodes) { nodes.forEach( - node -> serverAddedOrUpdated(toDruidServer(node)) + node -> serverAdded(toDruidServer(node)) ); - if (!initialized) { - initialized = true; + if (!initialized.getAndSet(true)) { queue.add(HttpServerInventoryView.this::serverInventoryInitialized); } } @@ -221,12 +221,19 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } ); - log.info("Started HttpServerInventoryView."); lifecycleLock.started(); } finally { lifecycleLock.exitStart(); } + + log.info("Waiting for Server Inventory Initialization..."); + + while (!inventoryInitializationLatch.await(1, TimeUnit.MINUTES)) { + log.info("Still waiting for Server Inventory Initialization..."); + } + + log.info("Started HttpServerInventoryView."); } } @@ -256,8 +263,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer Executor exec, SegmentCallback callback, Predicate> filter ) { - segmentCallbacks.put(callback, exec); - segmentPredicates.put(callback, filter); + SegmentCallback filteringSegmentCallback = new SingleServerInventoryView.FilteringSegmentCallback(callback, filter); + segmentCallbacks.put(filteringSegmentCallback, exec); + segmentPredicates.put(filteringSegmentCallback, filter); finalPredicate = Predicates.or( defaultFilter, @@ -354,6 +362,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer server.awaitInitialization(); } + inventoryInitializationLatch.countDown(); + log.info("Calling SegmentCallback.segmentViewInitialized() for all callbacks."); runSegmentCallbacks( @@ -368,19 +378,28 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer ); } - private DruidServer serverAddedOrUpdated(DruidServer server) + private void serverAdded(DruidServer server) { - DruidServerHolder newHolder = servers.compute( + DruidServerHolder holder = servers.computeIfAbsent( server.getName(), - (k, v) -> v == null ? new DruidServerHolder(server) : v.updatedHolder(server) + k -> new DruidServerHolder(server) ); - newHolder.updateSegmentsListAsync(); - return newHolder.druidServer; + + if (holder.druidServer == server) { + holder.updateSegmentsListAsync(); + } else { + log.info("Server[%s] already exists.", server.getName()); + } } private void serverRemoved(DruidServer server) { - servers.remove(server.getName()); + DruidServerHolder holder = servers.remove(server.getName()); + if (holder != null) { + runServerCallbacks(holder.druidServer); + } else { + log.info("Server[%s] did not exist. Removal notification ignored.", server.getName()); + } } @Override @@ -401,73 +420,23 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final Object lock = new Object(); //lock is used to keep state in counter and and segment list in druidServer consistent - // so that in "updateHolder()" method, new DruidServerHolder with updated DruidServer info - // can be safely created private final DruidServer druidServer; private volatile SegmentChangeRequestHistory.Counter counter = null; private final HostAndPort serverHostAndPort; - private final DataSegmentChangeHandler changeHandler; private final long serverHttpTimeout = config.getServerTimeout() + 1000; private final CountDownLatch initializationLatch = new CountDownLatch(1); - private volatile boolean isUnstable = false; private volatile long unstableStartTime = -1; + private volatile int consecutiveFailedAttemptCount = 0; DruidServerHolder(DruidServer druidServer) - { - this(druidServer, null); - } - - private DruidServerHolder(final DruidServer druidServer, final SegmentChangeRequestHistory.Counter counter) { this.druidServer = druidServer; this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost()); - this.counter = counter; - changeHandler = new DataSegmentChangeHandler() - { - @Override - public void addSegment( - final DataSegment segment, final DataSegmentChangeCallback callback - ) - { - if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) { - druidServer.addDataSegment(segment.getIdentifier(), segment); - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentAdded(druidServer.getMetadata(), segment); - } - } - ); - } - } - - @Override - public void removeSegment( - final DataSegment segment, final DataSegmentChangeCallback callback - ) - { - druidServer.removeDataSegment(segment.getIdentifier()); - - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentRemoved(druidServer.getMetadata(), segment); - } - } - ); - } - }; } //wait for first fetch of segment listing from server. @@ -484,14 +453,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } - DruidServerHolder updatedHolder(DruidServer server) - { - synchronized (lock) { - return new DruidServerHolder(server.addDataSegments(druidServer), counter); - } - } - - Future updateSegmentsListAsync() + void updateSegmentsListAsync() { try { final String req; @@ -554,7 +516,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer synchronized (lock) { if (delta.isResetCounter()) { - log.debug( + log.info( "Server [%s] requested resetCounter for reason [%s].", druidServer.getName(), delta.getResetCause() @@ -564,17 +526,49 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } if (counter == null) { - druidServer.removeAllSegments(); + // means, on last request either server had asked us to reset the counter or it was very first + // request to the server. + Map toRemove = Maps.newHashMap(druidServer.getSegments()); + + for (DataSegmentChangeRequest request : delta.getRequests()) { + if (request instanceof SegmentChangeRequestLoad) { + DataSegment segment = ((SegmentChangeRequestLoad) request).getSegment(); + toRemove.remove(segment.getIdentifier()); + addSegment(segment); + } else { + log.error( + "Server[%s] gave a non-load dataSegmentChangeRequest[%s]., Ignored.", + druidServer.getName(), + request + ); + } + } + + for (DataSegment segmentToRemove : toRemove.values()) { + removeSegment(segmentToRemove); + } + + } else { + for (DataSegmentChangeRequest request : delta.getRequests()) { + if (request instanceof SegmentChangeRequestLoad) { + addSegment(((SegmentChangeRequestLoad) request).getSegment()); + } else if (request instanceof SegmentChangeRequestDrop) { + removeSegment(((SegmentChangeRequestDrop) request).getSegment()); + } else { + log.error( + "Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.", + druidServer.getName(), + request + ); + } + } } - for (DataSegmentChangeRequest request : delta.getRequests()) { - request.go(changeHandler, null); - } counter = delta.getCounter(); } initializationLatch.countDown(); - isUnstable = false; + consecutiveFailedAttemptCount = 0; } catch (Exception ex) { log.error(ex, "error processing segment list response from server [%s]", druidServer.getName()); @@ -595,7 +589,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer responseHandler.description ); - if (hasUnstabilityTimeoutPassed()) { + if (incrementFailedAttemptAndCheckUnstabilityTimeout()) { if (t != null) { log.error(t, logMsg); } else { @@ -612,7 +606,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer // sleep for a bit so that retry does not happen immediately. try { - Thread.sleep(5000); + Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -625,32 +619,80 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer }, executor ); - - return future; } catch (Throwable th) { - addNextSyncToWorkQueue(druidServer.getName()); - - String logMsg = StringUtils.nonStrictFormat( - "Fatal error while fetching segment list from server [%s].", druidServer.getName() - ); - - if (hasUnstabilityTimeoutPassed()) { - log.makeAlert(th, logMsg).emit(); - } else { - log.info("Temporary Failure. %s", logMsg); - log.debug(th, logMsg); - } - - // sleep for a bit so that retry does not happen immediately. try { - Thread.sleep(5000); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } + String logMsg = StringUtils.nonStrictFormat( + "Fatal error while fetching segment list from server [%s].", druidServer.getName() + ); - throw Throwables.propagate(th); + if (incrementFailedAttemptAndCheckUnstabilityTimeout()) { + log.makeAlert(th, logMsg).emit(); + } else { + log.info("Temporary Failure. %s", logMsg); + log.debug(th, logMsg); + } + + // sleep for a bit so that retry does not happen immediately. + try { + Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + finally { + addNextSyncToWorkQueue(druidServer.getName()); + } + } + } + + private void addSegment(final DataSegment segment) + { + if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) { + if (druidServer.getSegment(segment.getIdentifier()) == null) { + druidServer.addDataSegment(segment.getIdentifier(), segment); + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentAdded(druidServer.getMetadata(), segment); + } + } + ); + } else { + log.warn( + "Not adding or running callbacks for existing segment[%s] on server[%s]", + segment.getIdentifier(), + druidServer.getName() + ); + } + } + } + + private void removeSegment(final DataSegment segment) + { + if (druidServer.getSegment(segment.getIdentifier()) != null) { + druidServer.removeDataSegment(segment.getIdentifier()); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentRemoved(druidServer.getMetadata(), segment); + } + } + ); + } else { + log.warn( + "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", + segment.getIdentifier(), + druidServer.getName() + ); } } @@ -666,14 +708,14 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer ); } - private boolean hasUnstabilityTimeoutPassed() + private boolean incrementFailedAttemptAndCheckUnstabilityTimeout() { - if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) { + if (consecutiveFailedAttemptCount > 0 + && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) { return true; } - if (!isUnstable) { - isUnstable = true; + if (consecutiveFailedAttemptCount++ == 0) { unstableStartTime = System.currentTimeMillis(); } diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java index 13a71f0928e..22ba09fff04 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java @@ -25,7 +25,6 @@ import com.google.common.base.Predicates; import com.metamx.http.client.HttpClient; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Client; -import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Pair; import io.druid.server.coordination.DruidServerMetadata; @@ -47,11 +46,6 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi @Smile ObjectMapper smileMapper = null; - @JacksonInject - @NotNull - @Json - ObjectMapper jsonMapper = null; - @JacksonInject @NotNull HttpServerInventoryViewConfig config = null; @@ -64,7 +58,6 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi public HttpServerInventoryView get() { return new HttpServerInventoryView( - jsonMapper, smileMapper, httpClient, druidNodeDiscoveryProvider, diff --git a/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java b/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java index 0be3020ae87..c20cb13a1f4 100644 --- a/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java +++ b/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java @@ -20,16 +20,11 @@ package io.druid.server.coordination.broker; import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import io.druid.client.FilteredServerInventoryView; import io.druid.client.ServerView; import io.druid.curator.discovery.ServiceAnnouncer; -import io.druid.discovery.DiscoveryDruidNode; -import io.druid.discovery.DruidNodeAnnouncer; -import io.druid.discovery.DruidNodeDiscoveryProvider; -import io.druid.discovery.LookupNodeService; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; import io.druid.java.util.common.Pair; @@ -44,8 +39,6 @@ public class DruidBroker { private final DruidNode self; private final ServiceAnnouncer serviceAnnouncer; - private final DruidNodeAnnouncer druidNodeAnnouncer; - private final DiscoveryDruidNode discoveryDruidNode; private volatile boolean started = false; @@ -53,19 +46,11 @@ public class DruidBroker public DruidBroker( final FilteredServerInventoryView serverInventoryView, final @Self DruidNode self, - final ServiceAnnouncer serviceAnnouncer, - final DruidNodeAnnouncer druidNodeAnnouncer, - final LookupNodeService lookupNodeService + final ServiceAnnouncer serviceAnnouncer ) { this.self = self; this.serviceAnnouncer = serviceAnnouncer; - this.druidNodeAnnouncer = druidNodeAnnouncer; - this.discoveryDruidNode = new DiscoveryDruidNode( - self, - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - ImmutableMap.of(lookupNodeService.getName(), lookupNodeService) - ); serverInventoryView.registerSegmentCallback( MoreExecutors.sameThreadExecutor(), @@ -75,7 +60,6 @@ public class DruidBroker public ServerView.CallbackAction segmentViewInitialized() { serviceAnnouncer.announce(self); - druidNodeAnnouncer.announce(discoveryDruidNode); return ServerView.CallbackAction.UNREGISTER; } }, @@ -103,7 +87,6 @@ public class DruidBroker return; } serviceAnnouncer.unannounce(self); - druidNodeAnnouncer.unannounce(discoveryDruidNode); started = false; } } diff --git a/server/src/test/java/io/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/io/druid/client/HttpServerInventoryViewTest.java new file mode 100644 index 00000000000..09a46e19c3b --- /dev/null +++ b/server/src/test/java/io/druid/client/HttpServerInventoryViewTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.HttpResponseHandler; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.RE; +import io.druid.server.DruidNode; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.SegmentChangeRequestDrop; +import io.druid.server.coordination.SegmentChangeRequestHistory; +import io.druid.server.coordination.SegmentChangeRequestLoad; +import io.druid.server.coordination.SegmentChangeRequestsSnapshot; +import io.druid.server.coordination.ServerType; +import io.druid.server.initialization.ServerConfig; +import io.druid.timeline.DataSegment; +import org.easymock.EasyMock; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + */ +public class HttpServerInventoryViewTest +{ + @Test(timeout = 10000) + public void testSimple() throws Exception + { + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + final DataSegment segment1 = new DataSegment( + "test1", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment2 = new DataSegment( + "test2", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment3 = new DataSegment( + "test3", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment4 = new DataSegment( + "test4", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + TestHttpClient httpClient = new TestHttpClient( + ImmutableList.of( + Futures.immediateFuture( + new ByteArrayInputStream( + jsonMapper.writeValueAsBytes( + new SegmentChangeRequestsSnapshot( + false, + null, + SegmentChangeRequestHistory.Counter.ZERO, + ImmutableList.of( + new SegmentChangeRequestLoad(segment1) + ) + ) + ) + ) + ), + Futures.immediateFuture( + new ByteArrayInputStream( + jsonMapper.writeValueAsBytes( + new SegmentChangeRequestsSnapshot( + false, + null, + SegmentChangeRequestHistory.Counter.ZERO, + ImmutableList.of( + new SegmentChangeRequestDrop(segment1), + new SegmentChangeRequestLoad(segment2), + new SegmentChangeRequestLoad(segment3) + ) + ) + ) + ) + ), + Futures.immediateFuture( + new ByteArrayInputStream( + jsonMapper.writeValueAsBytes( + new SegmentChangeRequestsSnapshot( + true, + "force reset counter", + SegmentChangeRequestHistory.Counter.ZERO, + ImmutableList.of() + ) + ) + ) + ), + Futures.immediateFuture( + new ByteArrayInputStream( + jsonMapper.writeValueAsBytes( + new SegmentChangeRequestsSnapshot( + false, + null, + SegmentChangeRequestHistory.Counter.ZERO, + ImmutableList.of( + new SegmentChangeRequestLoad(segment3), + new SegmentChangeRequestLoad(segment4) + ) + ) + ) + ) + ) + ) + ); + + DiscoveryDruidNode druidNode = new DiscoveryDruidNode( + new DruidNode("service", "host", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) + ) + ); + + HttpServerInventoryView httpServerInventoryView = new HttpServerInventoryView( + jsonMapper, + httpClient, + druidNodeDiscoveryProvider, + Predicates.alwaysTrue(), + new HttpServerInventoryViewConfig(null, null, null) + ); + + CountDownLatch initializeCallback1 = new CountDownLatch(1); + + Map segmentAddLathces = ImmutableMap.of( + segment1.getIdentifier(), new CountDownLatch(1), + segment2.getIdentifier(), new CountDownLatch(1), + segment3.getIdentifier(), new CountDownLatch(1), + segment4.getIdentifier(), new CountDownLatch(1) + ); + + Map segmentDropLatches = ImmutableMap.of( + segment1.getIdentifier(), new CountDownLatch(1), + segment2.getIdentifier(), new CountDownLatch(1) + ); + + httpServerInventoryView.registerSegmentCallback( + MoreExecutors.sameThreadExecutor(), + new ServerView.SegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded( + DruidServerMetadata server, DataSegment segment + ) + { + segmentAddLathces.get(segment.getIdentifier()).countDown(); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved( + DruidServerMetadata server, DataSegment segment + ) + { + segmentDropLatches.get(segment.getIdentifier()).countDown(); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentViewInitialized() + { + initializeCallback1.countDown(); + return ServerView.CallbackAction.CONTINUE; + } + } + ); + + final CountDownLatch serverRemovedCalled = new CountDownLatch(1); + httpServerInventoryView.registerServerRemovedCallback( + MoreExecutors.sameThreadExecutor(), + new ServerView.ServerRemovedCallback() + { + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + if (server.getName().equals("host:8080")) { + serverRemovedCalled.countDown(); + return ServerView.CallbackAction.CONTINUE; + } else { + throw new RE("Unknown server [%s]", server.getName()); + } + } + } + ); + + httpServerInventoryView.start(); + + druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode)); + + initializeCallback1.await(); + segmentAddLathces.get(segment1.getIdentifier()).await(); + segmentDropLatches.get(segment1.getIdentifier()).await(); + segmentAddLathces.get(segment2.getIdentifier()).await(); + segmentAddLathces.get(segment3.getIdentifier()).await(); + segmentAddLathces.get(segment4.getIdentifier()).await(); + segmentDropLatches.get(segment2.getIdentifier()).await(); + + DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080"); + Assert.assertEquals(ImmutableMap.of(segment3.getIdentifier(), segment3, segment4.getIdentifier(), segment4), + druidServer.getSegments()); + + druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode)); + + serverRemovedCalled.await(); + Assert.assertNull(httpServerInventoryView.getInventoryValue("host:8080")); + + EasyMock.verify(druidNodeDiscoveryProvider); + + httpServerInventoryView.stop(); + } + + private static class TestDruidNodeDiscovery implements DruidNodeDiscovery + { + Listener listener; + + @Override + public Collection getAllNodes() + { + throw new UnsupportedOperationException("Not Implemented."); + } + + @Override + public void registerListener(Listener listener) + { + listener.nodesAdded(ImmutableList.of()); + this.listener = listener; + } + } + + private static class TestHttpClient implements HttpClient + { + BlockingQueue results; + AtomicInteger requestNum = new AtomicInteger(0); + + TestHttpClient(List resultsList) + { + results = new LinkedBlockingQueue<>(); + results.addAll(resultsList); + } + + @Override + public ListenableFuture go( + Request request, HttpResponseHandler httpResponseHandler + ) + { + throw new UnsupportedOperationException("Not Implemented."); + } + + @Override + public ListenableFuture go( + Request request, HttpResponseHandler httpResponseHandler, Duration duration + ) + { + if (requestNum.getAndIncrement() == 0) { + //fail first request immediately + throw new RuntimeException("simulating couldn't send request to server for some reason."); + } + + if (requestNum.get() == 2) { + //fail scenario where request is sent to server but we got an unexpected response. + HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); + httpResponse.setContent(ChannelBuffers.buffer(0)); + httpResponseHandler.handleResponse(httpResponse); + return Futures.immediateFailedFuture(new RuntimeException("server error")); + } + + HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + httpResponse.setContent(ChannelBuffers.buffer(0)); + httpResponseHandler.handleResponse(httpResponse); + try { + return results.take(); + } + catch (InterruptedException ex) { + throw new RE(ex, "Interrupted."); + } + } + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 95586e7af29..3bade6247bf 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -21,6 +21,7 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; @@ -33,6 +34,8 @@ import io.druid.client.cache.CacheMonitor; import io.druid.client.selector.CustomTierSelectorStrategyConfig; import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; import io.druid.guice.CacheModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.Jerseys; @@ -120,6 +123,14 @@ public class CliBroker extends ServerRunnable MetricsModule.register(binder, CacheMonitor.class); LifecycleModule.register(binder, Server.class); + + binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableList.of(LookupNodeService.class) + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } }, new LookupModule(),