Fix loading segment for historical

Historical will drop a segment that shouldn't be dropped in the following scenario:
Historical node tried to load segmentA, but failed with SegmentLoadingException,
then ZkCoordinator called removeSegment(segmentA, blah) to schedule a runnable that would drop segmentA by deleting its files. Now, before that runnable executed, another LOAD request was sent to this historical, this time historical actually succeeded on loading segmentA and announced it. But later on, the scheduled drop-of-segment runnable started executing and removed the segment files, while historical is still announcing segmentA.
This commit is contained in:
Bingkun Guo 2015-12-17 15:26:32 -06:00
parent a54c726726
commit c4ad50f92c
11 changed files with 278 additions and 93 deletions

View File

@ -241,6 +241,12 @@ public class RealtimeIndexTask extends AbstractTask
} }
} }
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return toolbox.getSegmentAnnouncer().isAnnounced(segment);
}
}; };
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink

View File

@ -510,6 +510,12 @@ public class TaskLifecycleTest
{ {
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
}, // segment announcer }, // segment announcer
handoffNotifierFactory, handoffNotifierFactory,
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective

View File

@ -59,6 +59,12 @@ public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer
} }
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return announcedSegments.contains(segment);
}
public Set<DataSegment> getAnnouncedSegments() public Set<DataSegment> getAnnouncedSegments()
{ {
return ImmutableSet.copyOf(announcedSegments); return ImmutableSet.copyOf(announcedSegments);

View File

@ -31,5 +31,5 @@ public interface SegmentLoader
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException; public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException;
public Segment getSegment(DataSegment segment) throws SegmentLoadingException; public Segment getSegment(DataSegment segment) throws SegmentLoadingException;
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException; public void cleanup(DataSegment segment) throws SegmentLoadingException;
} }

View File

@ -120,8 +120,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment); availableZNode.addSegment(segment);
log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath());
availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode); segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode); availableZNodes.add(availableZNode);
@ -196,9 +195,21 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
} }
} }
private String makeServedSegmentPath(){ @Override
public boolean isAnnounced(DataSegment segment)
{
return segmentLookup.containsKey(segment);
}
private String makeServedSegmentPath()
{
// server.getName() is already in the zk path // server.getName() is already in the zk path
return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString())); return makeServedSegmentPath(UUIDUtils.generateUuid(
server.getHost(),
server.getType(),
server.getTier(),
new DateTime().toString()
));
} }
private String makeServedSegmentPath(String zNode) private String makeServedSegmentPath(String zNode)
@ -241,8 +252,8 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
try { try {
return jsonMapper.readValue( return jsonMapper.readValue(
bytes, new TypeReference<Set<DataSegment>>() bytes, new TypeReference<Set<DataSegment>>()
{ {
} }
); );
} }
catch (Exception e) { catch (Exception e) {

View File

@ -32,4 +32,9 @@ public interface DataSegmentAnnouncer
public void announceSegments(Iterable<DataSegment> segments) throws IOException; public void announceSegments(Iterable<DataSegment> segments) throws IOException;
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException; public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
/**
* @return true if the segment was already announced, otherwise false
*/
public boolean isAnnounced(DataSegment segment);
} }

View File

@ -1,71 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import io.druid.timeline.DataSegment;
import java.io.IOException;
/**
* This class has the greatest name ever
*/
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
{
private final Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers;
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers
)
{
this.dataSegmentAnnouncers = dataSegmentAnnouncers;
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.announceSegment(segment);
}
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.unannounceSegment(segment);
}
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.announceSegments(segments);
}
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
dataSegmentAnnouncer.unannounceSegments(segments);
}
}
}

View File

@ -46,6 +46,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -72,6 +73,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final ServerManager serverManager; private final ServerManager serverManager;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private volatile PathChildrenCache loadQueueCache; private volatile PathChildrenCache loadQueueCache;
@ -98,6 +100,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
this.serverManager = serverManager; this.serverManager = serverManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
this.segmentsToDelete = new ConcurrentSkipListSet<>();
} }
@LifecycleStart @LifecycleStart
@ -289,7 +292,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler
return ZkCoordinator.this; return ZkCoordinator.this;
} }
private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException /**
* Load a single segment. If the segment is loaded succesfully, this function simply returns. Otherwise it will
* throw a SegmentLoadingException
*
* @throws SegmentLoadingException
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
{ {
final boolean loaded; final boolean loaded;
try { try {
@ -314,7 +323,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
} }
} }
return loaded;
} }
@Override @Override
@ -322,7 +330,25 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{ {
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
if (loadSegment(segment, callback)) { /*
The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts,
and if(segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment
files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads
the segment, which makes dropping segment and downloading segment happen at the same time.
*/
if (segmentsToDelete.contains(segment)) {
/*
Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case,
each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make
things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of
cost of acquiring lock by doing the "contains" check outside the synchronized block.
*/
synchronized (lock) {
segmentsToDelete.remove(segment);
}
}
loadSegment(segment, callback);
if (!announcer.isAnnounced(segment)) {
try { try {
announcer.announceSegment(segment); announcer.announceSegment(segment);
} }
@ -369,8 +395,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
numSegments, numSegments,
segment.getIdentifier() segment.getIdentifier()
); );
final boolean loaded = loadSegment(segment, callback); loadSegment(segment, callback);
if (loaded) { if (!announcer.isAnnounced(segment)) {
try { try {
backgroundSegmentAnnouncer.announceSegment(segment); backgroundSegmentAnnouncer.announceSegment(segment);
} }
@ -426,6 +452,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{ {
try { try {
announcer.unannounceSegment(segment); announcer.unannounceSegment(segment);
segmentsToDelete.add(segment);
log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis()); log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis());
exec.schedule( exec.schedule(
@ -435,11 +462,15 @@ public class ZkCoordinator implements DataSegmentChangeHandler
public void run() public void run()
{ {
try { try {
serverManager.dropSegment(segment); synchronized (lock) {
if (segmentsToDelete.remove(segment)) {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) { if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
} }
} }
catch (Exception e) { catch (Exception e) {

View File

@ -28,12 +28,17 @@ import org.joda.time.Interval;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
*/ */
public class CacheTestSegmentLoader implements SegmentLoader public class CacheTestSegmentLoader implements SegmentLoader
{ {
private final Set<DataSegment> segmentsInTrash = new HashSet<>();
@Override @Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
{ {
@ -84,7 +89,13 @@ public class CacheTestSegmentLoader implements SegmentLoader
} }
@Override @Override
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException public void cleanup(DataSegment segment) throws SegmentLoadingException
{ {
segmentsInTrash.add(segment);
}
public Set<DataSegment> getSegmentsInTrash()
{
return segmentsInTrash;
} }
} }

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -59,20 +60,39 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public class ZkCoordinatorTest extends CuratorTestBase public class ZkCoordinatorTest extends CuratorTestBase
{ {
private static final Logger log = new Logger(ZkCoordinatorTest.class);
public static final int COUNT = 50; public static final int COUNT = 50;
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
"dummyHost",
0,
"dummyType",
"normal",
0
);
private ZkCoordinator zkCoordinator; private ZkCoordinator zkCoordinator;
private ServerManager serverManager; private ServerManager serverManager;
private DataSegmentAnnouncer announcer; private DataSegmentAnnouncer announcer;
private File infoDir; private File infoDir;
private AtomicInteger announceCount; private AtomicInteger announceCount;
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
private CacheTestSegmentLoader segmentLoader;
private List<Runnable> scheduledRunnable;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -92,8 +112,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
throw new RuntimeException(e); throw new RuntimeException(e);
} }
scheduledRunnable = Lists.newArrayList();
segmentLoader = new CacheTestSegmentLoader();
serverManager = new ServerManager( serverManager = new ServerManager(
new CacheTestSegmentLoader(), segmentLoader,
new NoopQueryRunnerFactoryConglomerate(), new NoopQueryRunnerFactoryConglomerate(),
new NoopServiceEmitter(), new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
@ -103,8 +127,6 @@ public class ZkCoordinatorTest extends CuratorTestBase
new CacheConfig() new CacheConfig()
); );
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
final ZkPathsConfig zkPaths = new ZkPathsConfig() final ZkPathsConfig zkPaths = new ZkPathsConfig()
{ {
@Override @Override
@ -114,6 +136,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
} }
}; };
segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
announceCount = new AtomicInteger(0); announceCount = new AtomicInteger(0);
announcer = new DataSegmentAnnouncer() announcer = new DataSegmentAnnouncer()
{ {
@ -128,6 +151,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
@Override @Override
public void announceSegment(DataSegment segment) throws IOException public void announceSegment(DataSegment segment) throws IOException
{ {
segmentsAnnouncedByMe.add(segment);
announceCount.incrementAndGet(); announceCount.incrementAndGet();
delegate.announceSegment(segment); delegate.announceSegment(segment);
} }
@ -135,6 +159,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
@Override @Override
public void unannounceSegment(DataSegment segment) throws IOException public void unannounceSegment(DataSegment segment) throws IOException
{ {
segmentsAnnouncedByMe.remove(segment);
announceCount.decrementAndGet(); announceCount.decrementAndGet();
delegate.unannounceSegment(segment); delegate.unannounceSegment(segment);
} }
@ -142,6 +167,9 @@ public class ZkCoordinatorTest extends CuratorTestBase
@Override @Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException public void announceSegments(Iterable<DataSegment> segments) throws IOException
{ {
for (DataSegment segment : segments) {
segmentsAnnouncedByMe.add(segment);
}
announceCount.addAndGet(Iterables.size(segments)); announceCount.addAndGet(Iterables.size(segments));
delegate.announceSegments(segments); delegate.announceSegments(segments);
} }
@ -149,9 +177,18 @@ public class ZkCoordinatorTest extends CuratorTestBase
@Override @Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{ {
for (DataSegment segment : segments) {
segmentsAnnouncedByMe.remove(segment);
}
announceCount.addAndGet(-Iterables.size(segments)); announceCount.addAndGet(-Iterables.size(segments));
delegate.unannounceSegments(segments); delegate.unannounceSegments(segments);
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return segmentsAnnouncedByMe.contains(segment);
}
}; };
zkCoordinator = new ZkCoordinator( zkCoordinator = new ZkCoordinator(
@ -175,13 +212,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
{ {
return 50; return 50;
} }
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
}, },
zkPaths, zkPaths,
me, me,
announcer, announcer,
curator, curator,
serverManager, serverManager,
ScheduledExecutors.createFactory(new Lifecycle()) new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int corePoolSize, String nameFormat)
{
/*
Override normal behavoir by adding the runnable to a list so that you can make sure
all the shceduled runnables are executed by explicitly calling run() on each item in the list
*/
return new ScheduledThreadPoolExecutor(
corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()
)
{
@Override
public ScheduledFuture<?> schedule(
Runnable command, long delay, TimeUnit unit
)
{
scheduledRunnable.add(command);
return null;
}
};
}
}
); );
} }
@ -191,6 +257,114 @@ public class ZkCoordinatorTest extends CuratorTestBase
tearDownServerAndCurator(); tearDownServerAndCurator();
} }
/**
* Steps:
* 1. removeSegment() schedules a delete runnable that deletes segment files,
* 2. addSegment() succesfully loads the segment and annouces it
* 3. scheduled delete task executes and realizes it should not delete the segment files.
*/
@Test
public void testSegmentLoading1() throws Exception
{
zkCoordinator.start();
final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01"));
zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
/*
make sure the scheduled runnable that "deletes" segment files has been executed.
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
ZkCoordinator, the scheduled runnable will not actually delete segment files.
*/
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
zkCoordinator.stop();
}
/**
* Steps:
* 1. addSegment() succesfully loads the segment and annouces it
* 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files
* 3. addSegment() calls loadSegment() and annouces it again
* 4. scheduled delete task executes and realizes it should not delete the segment files.
*/
@Test
public void testSegmentLoading2() throws Exception
{
zkCoordinator.start();
final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01"));
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
/*
make sure the scheduled runnable that "deletes" segment files has been executed.
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
ZkCoordinator, the scheduled runnable will not actually delete segment files.
*/
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
zkCoordinator.stop();
}
@Test @Test
public void testLoadCache() throws Exception public void testLoadCache() throws Exception
{ {

View File

@ -152,5 +152,11 @@ public class CliRealtimeExample extends ServerRunnable
{ {
// do nothing // do nothing
} }
@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
} }
} }