diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index cfec3e08322..59e04135973 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -241,6 +241,12 @@ public class RealtimeIndexTask extends AbstractTask } } } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return toolbox.getSegmentAnnouncer().isAnnounced(segment); + } }; // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 8b0f02a8479..9cd2449df01 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -510,6 +510,12 @@ public class TaskLifecycleTest { } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return false; + } }, // segment announcer handoffNotifierFactory, queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java index be5ef6df9ea..2e36d2a1818 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java @@ -59,6 +59,12 @@ public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer } } + @Override + public boolean isAnnounced(DataSegment segment) + { + return announcedSegments.contains(segment); + } + public Set getAnnouncedSegments() { return ImmutableSet.copyOf(announcedSegments); diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoader.java b/server/src/main/java/io/druid/segment/loading/SegmentLoader.java index a9cd5cadc80..2842f91dd51 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoader.java @@ -31,5 +31,5 @@ public interface SegmentLoader public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException; public Segment getSegment(DataSegment segment) throws SegmentLoadingException; public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; - public void cleanup(DataSegment loadSpec) throws SegmentLoadingException; + public void cleanup(DataSegment segment) throws SegmentLoadingException; } diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 25264d5e69d..503649a5073 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -120,8 +120,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); availableZNode.addSegment(segment); - log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), - availableZNode.getPath()); + log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath()); announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); segmentLookup.put(segment, availableZNode); availableZNodes.add(availableZNode); @@ -196,9 +195,21 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer } } - private String makeServedSegmentPath(){ + @Override + public boolean isAnnounced(DataSegment segment) + { + return segmentLookup.containsKey(segment); + } + + private String makeServedSegmentPath() + { // server.getName() is already in the zk path - return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString())); + return makeServedSegmentPath(UUIDUtils.generateUuid( + server.getHost(), + server.getType(), + server.getTier(), + new DateTime().toString() + )); } private String makeServedSegmentPath(String zNode) @@ -241,8 +252,8 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer try { return jsonMapper.readValue( bytes, new TypeReference>() - { - } + { + } ); } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java index d6e8c4a27a8..ef6eafab6a7 100644 --- a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java @@ -32,4 +32,9 @@ public interface DataSegmentAnnouncer public void announceSegments(Iterable segments) throws IOException; public void unannounceSegments(Iterable segments) throws IOException; + + /** + * @return true if the segment was already announced, otherwise false + */ + public boolean isAnnounced(DataSegment segment); } diff --git a/server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java deleted file mode 100644 index 64fd4bb90e5..00000000000 --- a/server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.coordination; - -import io.druid.timeline.DataSegment; - -import java.io.IOException; - -/** - * This class has the greatest name ever - */ -public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer -{ - private final Iterable dataSegmentAnnouncers; - - public MultipleDataSegmentAnnouncerDataSegmentAnnouncer( - Iterable dataSegmentAnnouncers - ) - { - this.dataSegmentAnnouncers = dataSegmentAnnouncers; - } - - @Override - public void announceSegment(DataSegment segment) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.announceSegment(segment); - } - } - - @Override - public void unannounceSegment(DataSegment segment) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.unannounceSegment(segment); - } - } - - @Override - public void announceSegments(Iterable segments) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.announceSegments(segments); - } - } - - @Override - public void unannounceSegments(Iterable segments) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.unannounceSegments(segments); - } - } -} diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 91749d120e1..77df75097c3 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -46,6 +46,7 @@ import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -72,6 +73,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final DataSegmentAnnouncer announcer; private final ServerManager serverManager; private final ScheduledExecutorService exec; + private final ConcurrentSkipListSet segmentsToDelete; private volatile PathChildrenCache loadQueueCache; @@ -98,6 +100,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler this.serverManager = serverManager; this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); + this.segmentsToDelete = new ConcurrentSkipListSet<>(); } @LifecycleStart @@ -289,7 +292,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler return ZkCoordinator.this; } - private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + /** + * Load a single segment. If the segment is loaded succesfully, this function simply returns. Otherwise it will + * throw a SegmentLoadingException + * + * @throws SegmentLoadingException + */ + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException { final boolean loaded; try { @@ -314,7 +323,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler } } } - return loaded; } @Override @@ -322,7 +330,25 @@ public class ZkCoordinator implements DataSegmentChangeHandler { try { log.info("Loading segment %s", segment.getIdentifier()); - if (loadSegment(segment, callback)) { + /* + The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, + and if(segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment + files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads + the segment, which makes dropping segment and downloading segment happen at the same time. + */ + if (segmentsToDelete.contains(segment)) { + /* + Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, + each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make + things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of + cost of acquiring lock by doing the "contains" check outside the synchronized block. + */ + synchronized (lock) { + segmentsToDelete.remove(segment); + } + } + loadSegment(segment, callback); + if (!announcer.isAnnounced(segment)) { try { announcer.announceSegment(segment); } @@ -369,8 +395,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler numSegments, segment.getIdentifier() ); - final boolean loaded = loadSegment(segment, callback); - if (loaded) { + loadSegment(segment, callback); + if (!announcer.isAnnounced(segment)) { try { backgroundSegmentAnnouncer.announceSegment(segment); } @@ -426,6 +452,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler { try { announcer.unannounceSegment(segment); + segmentsToDelete.add(segment); log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); exec.schedule( @@ -435,11 +462,15 @@ public class ZkCoordinator implements DataSegmentChangeHandler public void run() { try { - serverManager.dropSegment(segment); + synchronized (lock) { + if (segmentsToDelete.remove(segment)) { + serverManager.dropSegment(segment); - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } } } catch (Exception e) { diff --git a/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java index a6b4b61468c..7b5137e7fb5 100644 --- a/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java @@ -28,12 +28,17 @@ import org.joda.time.Interval; import java.io.File; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** */ public class CacheTestSegmentLoader implements SegmentLoader { + + private final Set segmentsInTrash = new HashSet<>(); + @Override public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException { @@ -84,7 +89,13 @@ public class CacheTestSegmentLoader implements SegmentLoader } @Override - public void cleanup(DataSegment loadSpec) throws SegmentLoadingException + public void cleanup(DataSegment segment) throws SegmentLoadingException { + segmentsInTrash.add(segment); + } + + public Set getSegmentsInTrash() + { + return segmentsInTrash; } } diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 8c0fae715af..04dcdebf3c1 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -59,20 +60,39 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** */ public class ZkCoordinatorTest extends CuratorTestBase { - private static final Logger log = new Logger(ZkCoordinatorTest.class); public static final int COUNT = 50; + + private static final Logger log = new Logger(ZkCoordinatorTest.class); + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private final DruidServerMetadata me = new DruidServerMetadata( + "dummyServer", + "dummyHost", + 0, + "dummyType", + "normal", + 0 + ); + private ZkCoordinator zkCoordinator; private ServerManager serverManager; private DataSegmentAnnouncer announcer; private File infoDir; private AtomicInteger announceCount; + private ConcurrentSkipListSet segmentsAnnouncedByMe; + private CacheTestSegmentLoader segmentLoader; + private List scheduledRunnable; @Before public void setUp() throws Exception @@ -92,8 +112,12 @@ public class ZkCoordinatorTest extends CuratorTestBase throw new RuntimeException(e); } + scheduledRunnable = Lists.newArrayList(); + + segmentLoader = new CacheTestSegmentLoader(); + serverManager = new ServerManager( - new CacheTestSegmentLoader(), + segmentLoader, new NoopQueryRunnerFactoryConglomerate(), new NoopServiceEmitter(), MoreExecutors.sameThreadExecutor(), @@ -103,8 +127,6 @@ public class ZkCoordinatorTest extends CuratorTestBase new CacheConfig() ); - final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0); - final ZkPathsConfig zkPaths = new ZkPathsConfig() { @Override @@ -114,6 +136,7 @@ public class ZkCoordinatorTest extends CuratorTestBase } }; + segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); announceCount = new AtomicInteger(0); announcer = new DataSegmentAnnouncer() { @@ -128,6 +151,7 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void announceSegment(DataSegment segment) throws IOException { + segmentsAnnouncedByMe.add(segment); announceCount.incrementAndGet(); delegate.announceSegment(segment); } @@ -135,6 +159,7 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void unannounceSegment(DataSegment segment) throws IOException { + segmentsAnnouncedByMe.remove(segment); announceCount.decrementAndGet(); delegate.unannounceSegment(segment); } @@ -142,6 +167,9 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void announceSegments(Iterable segments) throws IOException { + for (DataSegment segment : segments) { + segmentsAnnouncedByMe.add(segment); + } announceCount.addAndGet(Iterables.size(segments)); delegate.announceSegments(segments); } @@ -149,9 +177,18 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void unannounceSegments(Iterable segments) throws IOException { + for (DataSegment segment : segments) { + segmentsAnnouncedByMe.remove(segment); + } announceCount.addAndGet(-Iterables.size(segments)); delegate.unannounceSegments(segments); } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return segmentsAnnouncedByMe.contains(segment); + } }; zkCoordinator = new ZkCoordinator( @@ -175,13 +212,42 @@ public class ZkCoordinatorTest extends CuratorTestBase { return 50; } + + @Override + public int getDropSegmentDelayMillis() + { + return 0; + } }, zkPaths, me, announcer, curator, serverManager, - ScheduledExecutors.createFactory(new Lifecycle()) + new ScheduledExecutorFactory() + { + @Override + public ScheduledExecutorService create(int corePoolSize, String nameFormat) + { + /* + Override normal behavoir by adding the runnable to a list so that you can make sure + all the shceduled runnables are executed by explicitly calling run() on each item in the list + */ + return new ScheduledThreadPoolExecutor( + corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build() + ) + { + @Override + public ScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit + ) + { + scheduledRunnable.add(command); + return null; + } + }; + } + } ); } @@ -191,6 +257,114 @@ public class ZkCoordinatorTest extends CuratorTestBase tearDownServerAndCurator(); } + /** + * Steps: + * 1. removeSegment() schedules a delete runnable that deletes segment files, + * 2. addSegment() succesfully loads the segment and annouces it + * 3. scheduled delete task executes and realizes it should not delete the segment files. + */ + @Test + public void testSegmentLoading1() throws Exception + { + zkCoordinator.start(); + + final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01")); + + zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + + zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + /* + make sure the scheduled runnable that "deletes" segment files has been executed. + Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in + ZkCoordinator, the scheduled runnable will not actually delete segment files. + */ + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + + zkCoordinator.stop(); + } + + /** + * Steps: + * 1. addSegment() succesfully loads the segment and annouces it + * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files + * 3. addSegment() calls loadSegment() and annouces it again + * 4. scheduled delete task executes and realizes it should not delete the segment files. + */ + @Test + public void testSegmentLoading2() throws Exception + { + zkCoordinator.start(); + + final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01")); + + zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + + zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + + zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + /* + make sure the scheduled runnable that "deletes" segment files has been executed. + Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in + ZkCoordinator, the scheduled runnable will not actually delete segment files. + */ + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + + zkCoordinator.stop(); + } + @Test public void testLoadCache() throws Exception { diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index a4b51777d08..2f91d383413 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -152,5 +152,11 @@ public class CliRealtimeExample extends ServerRunnable { // do nothing } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return false; + } } }