From c4ad50f92c25f9af06dbb9f9278012f62b4147b8 Mon Sep 17 00:00:00 2001 From: Bingkun Guo Date: Thu, 17 Dec 2015 15:26:32 -0600 Subject: [PATCH] Fix loading segment for historical Historical will drop a segment that shouldn't be dropped in the following scenario: Historical node tried to load segmentA, but failed with SegmentLoadingException, then ZkCoordinator called removeSegment(segmentA, blah) to schedule a runnable that would drop segmentA by deleting its files. Now, before that runnable executed, another LOAD request was sent to this historical, this time historical actually succeeded on loading segmentA and announced it. But later on, the scheduled drop-of-segment runnable started executing and removed the segment files, while historical is still announcing segmentA. --- .../common/task/RealtimeIndexTask.java | 6 + .../indexing/overlord/TaskLifecycleTest.java | 6 + .../test/TestDataSegmentAnnouncer.java | 6 + .../druid/segment/loading/SegmentLoader.java | 2 +- .../BatchDataSegmentAnnouncer.java | 23 ++- .../coordination/DataSegmentAnnouncer.java | 5 + ...aSegmentAnnouncerDataSegmentAnnouncer.java | 71 ------- .../server/coordination/ZkCoordinator.java | 49 ++++- .../loading/CacheTestSegmentLoader.java | 13 +- .../coordination/ZkCoordinatorTest.java | 184 +++++++++++++++++- .../java/io/druid/cli/CliRealtimeExample.java | 6 + 11 files changed, 278 insertions(+), 93 deletions(-) delete mode 100644 server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index cfec3e08322..59e04135973 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -241,6 +241,12 @@ public class RealtimeIndexTask extends AbstractTask } } } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return toolbox.getSegmentAnnouncer().isAnnounced(segment); + } }; // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 8b0f02a8479..9cd2449df01 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -510,6 +510,12 @@ public class TaskLifecycleTest { } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return false; + } }, // segment announcer handoffNotifierFactory, queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java index be5ef6df9ea..2e36d2a1818 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java @@ -59,6 +59,12 @@ public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer } } + @Override + public boolean isAnnounced(DataSegment segment) + { + return announcedSegments.contains(segment); + } + public Set getAnnouncedSegments() { return ImmutableSet.copyOf(announcedSegments); diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoader.java b/server/src/main/java/io/druid/segment/loading/SegmentLoader.java index a9cd5cadc80..2842f91dd51 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoader.java @@ -31,5 +31,5 @@ public interface SegmentLoader public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException; public Segment getSegment(DataSegment segment) throws SegmentLoadingException; public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; - public void cleanup(DataSegment loadSpec) throws SegmentLoadingException; + public void cleanup(DataSegment segment) throws SegmentLoadingException; } diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 25264d5e69d..503649a5073 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -120,8 +120,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); availableZNode.addSegment(segment); - log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), - availableZNode.getPath()); + log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath()); announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); segmentLookup.put(segment, availableZNode); availableZNodes.add(availableZNode); @@ -196,9 +195,21 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer } } - private String makeServedSegmentPath(){ + @Override + public boolean isAnnounced(DataSegment segment) + { + return segmentLookup.containsKey(segment); + } + + private String makeServedSegmentPath() + { // server.getName() is already in the zk path - return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString())); + return makeServedSegmentPath(UUIDUtils.generateUuid( + server.getHost(), + server.getType(), + server.getTier(), + new DateTime().toString() + )); } private String makeServedSegmentPath(String zNode) @@ -241,8 +252,8 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer try { return jsonMapper.readValue( bytes, new TypeReference>() - { - } + { + } ); } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java index d6e8c4a27a8..ef6eafab6a7 100644 --- a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java @@ -32,4 +32,9 @@ public interface DataSegmentAnnouncer public void announceSegments(Iterable segments) throws IOException; public void unannounceSegments(Iterable segments) throws IOException; + + /** + * @return true if the segment was already announced, otherwise false + */ + public boolean isAnnounced(DataSegment segment); } diff --git a/server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java deleted file mode 100644 index 64fd4bb90e5..00000000000 --- a/server/src/main/java/io/druid/server/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.coordination; - -import io.druid.timeline.DataSegment; - -import java.io.IOException; - -/** - * This class has the greatest name ever - */ -public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer -{ - private final Iterable dataSegmentAnnouncers; - - public MultipleDataSegmentAnnouncerDataSegmentAnnouncer( - Iterable dataSegmentAnnouncers - ) - { - this.dataSegmentAnnouncers = dataSegmentAnnouncers; - } - - @Override - public void announceSegment(DataSegment segment) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.announceSegment(segment); - } - } - - @Override - public void unannounceSegment(DataSegment segment) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.unannounceSegment(segment); - } - } - - @Override - public void announceSegments(Iterable segments) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.announceSegments(segments); - } - } - - @Override - public void unannounceSegments(Iterable segments) throws IOException - { - for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) { - dataSegmentAnnouncer.unannounceSegments(segments); - } - } -} diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 91749d120e1..77df75097c3 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -46,6 +46,7 @@ import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -72,6 +73,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final DataSegmentAnnouncer announcer; private final ServerManager serverManager; private final ScheduledExecutorService exec; + private final ConcurrentSkipListSet segmentsToDelete; private volatile PathChildrenCache loadQueueCache; @@ -98,6 +100,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler this.serverManager = serverManager; this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); + this.segmentsToDelete = new ConcurrentSkipListSet<>(); } @LifecycleStart @@ -289,7 +292,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler return ZkCoordinator.this; } - private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + /** + * Load a single segment. If the segment is loaded succesfully, this function simply returns. Otherwise it will + * throw a SegmentLoadingException + * + * @throws SegmentLoadingException + */ + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException { final boolean loaded; try { @@ -314,7 +323,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler } } } - return loaded; } @Override @@ -322,7 +330,25 @@ public class ZkCoordinator implements DataSegmentChangeHandler { try { log.info("Loading segment %s", segment.getIdentifier()); - if (loadSegment(segment, callback)) { + /* + The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, + and if(segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment + files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads + the segment, which makes dropping segment and downloading segment happen at the same time. + */ + if (segmentsToDelete.contains(segment)) { + /* + Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case, + each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make + things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of + cost of acquiring lock by doing the "contains" check outside the synchronized block. + */ + synchronized (lock) { + segmentsToDelete.remove(segment); + } + } + loadSegment(segment, callback); + if (!announcer.isAnnounced(segment)) { try { announcer.announceSegment(segment); } @@ -369,8 +395,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler numSegments, segment.getIdentifier() ); - final boolean loaded = loadSegment(segment, callback); - if (loaded) { + loadSegment(segment, callback); + if (!announcer.isAnnounced(segment)) { try { backgroundSegmentAnnouncer.announceSegment(segment); } @@ -426,6 +452,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler { try { announcer.unannounceSegment(segment); + segmentsToDelete.add(segment); log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); exec.schedule( @@ -435,11 +462,15 @@ public class ZkCoordinator implements DataSegmentChangeHandler public void run() { try { - serverManager.dropSegment(segment); + synchronized (lock) { + if (segmentsToDelete.remove(segment)) { + serverManager.dropSegment(segment); - File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); - if (!segmentInfoCacheFile.delete()) { - log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); + if (!segmentInfoCacheFile.delete()) { + log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); + } + } } } catch (Exception e) { diff --git a/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java index a6b4b61468c..7b5137e7fb5 100644 --- a/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/io/druid/segment/loading/CacheTestSegmentLoader.java @@ -28,12 +28,17 @@ import org.joda.time.Interval; import java.io.File; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** */ public class CacheTestSegmentLoader implements SegmentLoader { + + private final Set segmentsInTrash = new HashSet<>(); + @Override public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException { @@ -84,7 +89,13 @@ public class CacheTestSegmentLoader implements SegmentLoader } @Override - public void cleanup(DataSegment loadSpec) throws SegmentLoadingException + public void cleanup(DataSegment segment) throws SegmentLoadingException { + segmentsInTrash.add(segment); + } + + public Set getSegmentsInTrash() + { + return segmentsInTrash; } } diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 8c0fae715af..04dcdebf3c1 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -59,20 +60,39 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** */ public class ZkCoordinatorTest extends CuratorTestBase { - private static final Logger log = new Logger(ZkCoordinatorTest.class); public static final int COUNT = 50; + + private static final Logger log = new Logger(ZkCoordinatorTest.class); + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private final DruidServerMetadata me = new DruidServerMetadata( + "dummyServer", + "dummyHost", + 0, + "dummyType", + "normal", + 0 + ); + private ZkCoordinator zkCoordinator; private ServerManager serverManager; private DataSegmentAnnouncer announcer; private File infoDir; private AtomicInteger announceCount; + private ConcurrentSkipListSet segmentsAnnouncedByMe; + private CacheTestSegmentLoader segmentLoader; + private List scheduledRunnable; @Before public void setUp() throws Exception @@ -92,8 +112,12 @@ public class ZkCoordinatorTest extends CuratorTestBase throw new RuntimeException(e); } + scheduledRunnable = Lists.newArrayList(); + + segmentLoader = new CacheTestSegmentLoader(); + serverManager = new ServerManager( - new CacheTestSegmentLoader(), + segmentLoader, new NoopQueryRunnerFactoryConglomerate(), new NoopServiceEmitter(), MoreExecutors.sameThreadExecutor(), @@ -103,8 +127,6 @@ public class ZkCoordinatorTest extends CuratorTestBase new CacheConfig() ); - final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0); - final ZkPathsConfig zkPaths = new ZkPathsConfig() { @Override @@ -114,6 +136,7 @@ public class ZkCoordinatorTest extends CuratorTestBase } }; + segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); announceCount = new AtomicInteger(0); announcer = new DataSegmentAnnouncer() { @@ -128,6 +151,7 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void announceSegment(DataSegment segment) throws IOException { + segmentsAnnouncedByMe.add(segment); announceCount.incrementAndGet(); delegate.announceSegment(segment); } @@ -135,6 +159,7 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void unannounceSegment(DataSegment segment) throws IOException { + segmentsAnnouncedByMe.remove(segment); announceCount.decrementAndGet(); delegate.unannounceSegment(segment); } @@ -142,6 +167,9 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void announceSegments(Iterable segments) throws IOException { + for (DataSegment segment : segments) { + segmentsAnnouncedByMe.add(segment); + } announceCount.addAndGet(Iterables.size(segments)); delegate.announceSegments(segments); } @@ -149,9 +177,18 @@ public class ZkCoordinatorTest extends CuratorTestBase @Override public void unannounceSegments(Iterable segments) throws IOException { + for (DataSegment segment : segments) { + segmentsAnnouncedByMe.remove(segment); + } announceCount.addAndGet(-Iterables.size(segments)); delegate.unannounceSegments(segments); } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return segmentsAnnouncedByMe.contains(segment); + } }; zkCoordinator = new ZkCoordinator( @@ -175,13 +212,42 @@ public class ZkCoordinatorTest extends CuratorTestBase { return 50; } + + @Override + public int getDropSegmentDelayMillis() + { + return 0; + } }, zkPaths, me, announcer, curator, serverManager, - ScheduledExecutors.createFactory(new Lifecycle()) + new ScheduledExecutorFactory() + { + @Override + public ScheduledExecutorService create(int corePoolSize, String nameFormat) + { + /* + Override normal behavoir by adding the runnable to a list so that you can make sure + all the shceduled runnables are executed by explicitly calling run() on each item in the list + */ + return new ScheduledThreadPoolExecutor( + corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build() + ) + { + @Override + public ScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit + ) + { + scheduledRunnable.add(command); + return null; + } + }; + } + } ); } @@ -191,6 +257,114 @@ public class ZkCoordinatorTest extends CuratorTestBase tearDownServerAndCurator(); } + /** + * Steps: + * 1. removeSegment() schedules a delete runnable that deletes segment files, + * 2. addSegment() succesfully loads the segment and annouces it + * 3. scheduled delete task executes and realizes it should not delete the segment files. + */ + @Test + public void testSegmentLoading1() throws Exception + { + zkCoordinator.start(); + + final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01")); + + zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + + zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + /* + make sure the scheduled runnable that "deletes" segment files has been executed. + Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in + ZkCoordinator, the scheduled runnable will not actually delete segment files. + */ + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + + zkCoordinator.stop(); + } + + /** + * Steps: + * 1. addSegment() succesfully loads the segment and annouces it + * 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files + * 3. addSegment() calls loadSegment() and annouces it again + * 4. scheduled delete task executes and realizes it should not delete the segment files. + */ + @Test + public void testSegmentLoading2() throws Exception + { + zkCoordinator.start(); + + final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01")); + + zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + + zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + Assert.assertFalse(segmentsAnnouncedByMe.contains(segment)); + + zkCoordinator.addSegment(segment, new DataSegmentChangeCallback() + { + @Override + public void execute() + { + // do nothing + } + }); + + /* + make sure the scheduled runnable that "deletes" segment files has been executed. + Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in + ZkCoordinator, the scheduled runnable will not actually delete segment files. + */ + for (Runnable runnable : scheduledRunnable) { + runnable.run(); + } + + Assert.assertTrue(segmentsAnnouncedByMe.contains(segment)); + Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment)); + + zkCoordinator.stop(); + } + @Test public void testLoadCache() throws Exception { diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index a4b51777d08..2f91d383413 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -152,5 +152,11 @@ public class CliRealtimeExample extends ServerRunnable { // do nothing } + + @Override + public boolean isAnnounced(DataSegment segment) + { + return false; + } } }