mirror of
https://github.com/apache/druid.git
synced 2025-02-13 05:25:11 +00:00
Merge pull request #2118 from guobingkun/fix_segment_loading
Fix loading segment for historical
This commit is contained in:
commit
fe841fd961
@ -242,6 +242,12 @@ public class RealtimeIndexTask extends AbstractTask
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnnounced(DataSegment segment)
|
||||||
|
{
|
||||||
|
return toolbox.getSegmentAnnouncer().isAnnounced(segment);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
|
// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
|
||||||
|
@ -513,6 +513,12 @@ public class TaskLifecycleTest
|
|||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnnounced(DataSegment segment)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}, // segment announcer
|
}, // segment announcer
|
||||||
handoffNotifierFactory,
|
handoffNotifierFactory,
|
||||||
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
|
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
|
||||||
|
@ -59,6 +59,12 @@ public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnnounced(DataSegment segment)
|
||||||
|
{
|
||||||
|
return announcedSegments.contains(segment);
|
||||||
|
}
|
||||||
|
|
||||||
public Set<DataSegment> getAnnouncedSegments()
|
public Set<DataSegment> getAnnouncedSegments()
|
||||||
{
|
{
|
||||||
return ImmutableSet.copyOf(announcedSegments);
|
return ImmutableSet.copyOf(announcedSegments);
|
||||||
|
@ -31,5 +31,5 @@ public interface SegmentLoader
|
|||||||
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException;
|
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException;
|
||||||
public Segment getSegment(DataSegment segment) throws SegmentLoadingException;
|
public Segment getSegment(DataSegment segment) throws SegmentLoadingException;
|
||||||
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
|
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
|
||||||
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException;
|
public void cleanup(DataSegment segment) throws SegmentLoadingException;
|
||||||
}
|
}
|
||||||
|
@ -120,8 +120,7 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||||||
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
|
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
|
||||||
availableZNode.addSegment(segment);
|
availableZNode.addSegment(segment);
|
||||||
|
|
||||||
log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(),
|
log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath());
|
||||||
availableZNode.getPath());
|
|
||||||
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
|
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
|
||||||
segmentLookup.put(segment, availableZNode);
|
segmentLookup.put(segment, availableZNode);
|
||||||
availableZNodes.add(availableZNode);
|
availableZNodes.add(availableZNode);
|
||||||
@ -196,9 +195,21 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String makeServedSegmentPath(){
|
@Override
|
||||||
|
public boolean isAnnounced(DataSegment segment)
|
||||||
|
{
|
||||||
|
return segmentLookup.containsKey(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String makeServedSegmentPath()
|
||||||
|
{
|
||||||
// server.getName() is already in the zk path
|
// server.getName() is already in the zk path
|
||||||
return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString()));
|
return makeServedSegmentPath(UUIDUtils.generateUuid(
|
||||||
|
server.getHost(),
|
||||||
|
server.getType(),
|
||||||
|
server.getTier(),
|
||||||
|
new DateTime().toString()
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String makeServedSegmentPath(String zNode)
|
private String makeServedSegmentPath(String zNode)
|
||||||
|
@ -32,4 +32,9 @@ public interface DataSegmentAnnouncer
|
|||||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException;
|
public void announceSegments(Iterable<DataSegment> segments) throws IOException;
|
||||||
|
|
||||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
|
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the segment was already announced, otherwise false
|
||||||
|
*/
|
||||||
|
public boolean isAnnounced(DataSegment segment);
|
||||||
}
|
}
|
||||||
|
@ -1,71 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. Metamarkets licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.druid.server.coordination;
|
|
||||||
|
|
||||||
import io.druid.timeline.DataSegment;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class has the greatest name ever
|
|
||||||
*/
|
|
||||||
public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
|
|
||||||
{
|
|
||||||
private final Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers;
|
|
||||||
|
|
||||||
public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
|
|
||||||
Iterable<DataSegmentAnnouncer> dataSegmentAnnouncers
|
|
||||||
)
|
|
||||||
{
|
|
||||||
this.dataSegmentAnnouncers = dataSegmentAnnouncers;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void announceSegment(DataSegment segment) throws IOException
|
|
||||||
{
|
|
||||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
|
||||||
dataSegmentAnnouncer.announceSegment(segment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unannounceSegment(DataSegment segment) throws IOException
|
|
||||||
{
|
|
||||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
|
||||||
dataSegmentAnnouncer.unannounceSegment(segment);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
|
||||||
{
|
|
||||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
|
||||||
dataSegmentAnnouncer.announceSegments(segments);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
|
||||||
{
|
|
||||||
for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
|
|
||||||
dataSegmentAnnouncer.unannounceSegments(segments);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -46,6 +46,7 @@ import java.io.File;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
@ -72,6 +73,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
private final DataSegmentAnnouncer announcer;
|
private final DataSegmentAnnouncer announcer;
|
||||||
private final ServerManager serverManager;
|
private final ServerManager serverManager;
|
||||||
private final ScheduledExecutorService exec;
|
private final ScheduledExecutorService exec;
|
||||||
|
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
|
||||||
|
|
||||||
|
|
||||||
private volatile PathChildrenCache loadQueueCache;
|
private volatile PathChildrenCache loadQueueCache;
|
||||||
@ -98,6 +100,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
this.serverManager = serverManager;
|
this.serverManager = serverManager;
|
||||||
|
|
||||||
this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
|
this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
|
||||||
|
this.segmentsToDelete = new ConcurrentSkipListSet<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@LifecycleStart
|
@LifecycleStart
|
||||||
@ -289,7 +292,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
return ZkCoordinator.this;
|
return ZkCoordinator.this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
|
/**
|
||||||
|
* Load a single segment. If the segment is loaded succesfully, this function simply returns. Otherwise it will
|
||||||
|
* throw a SegmentLoadingException
|
||||||
|
*
|
||||||
|
* @throws SegmentLoadingException
|
||||||
|
*/
|
||||||
|
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
final boolean loaded;
|
final boolean loaded;
|
||||||
try {
|
try {
|
||||||
@ -314,7 +323,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return loaded;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -322,7 +330,25 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
log.info("Loading segment %s", segment.getIdentifier());
|
log.info("Loading segment %s", segment.getIdentifier());
|
||||||
if (loadSegment(segment, callback)) {
|
/*
|
||||||
|
The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts,
|
||||||
|
and if(segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment
|
||||||
|
files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads
|
||||||
|
the segment, which makes dropping segment and downloading segment happen at the same time.
|
||||||
|
*/
|
||||||
|
if (segmentsToDelete.contains(segment)) {
|
||||||
|
/*
|
||||||
|
Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case,
|
||||||
|
each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make
|
||||||
|
things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of
|
||||||
|
cost of acquiring lock by doing the "contains" check outside the synchronized block.
|
||||||
|
*/
|
||||||
|
synchronized (lock) {
|
||||||
|
segmentsToDelete.remove(segment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
loadSegment(segment, callback);
|
||||||
|
if (!announcer.isAnnounced(segment)) {
|
||||||
try {
|
try {
|
||||||
announcer.announceSegment(segment);
|
announcer.announceSegment(segment);
|
||||||
}
|
}
|
||||||
@ -369,8 +395,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
numSegments,
|
numSegments,
|
||||||
segment.getIdentifier()
|
segment.getIdentifier()
|
||||||
);
|
);
|
||||||
final boolean loaded = loadSegment(segment, callback);
|
loadSegment(segment, callback);
|
||||||
if (loaded) {
|
if (!announcer.isAnnounced(segment)) {
|
||||||
try {
|
try {
|
||||||
backgroundSegmentAnnouncer.announceSegment(segment);
|
backgroundSegmentAnnouncer.announceSegment(segment);
|
||||||
}
|
}
|
||||||
@ -426,6 +452,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
announcer.unannounceSegment(segment);
|
announcer.unannounceSegment(segment);
|
||||||
|
segmentsToDelete.add(segment);
|
||||||
|
|
||||||
log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis());
|
log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis());
|
||||||
exec.schedule(
|
exec.schedule(
|
||||||
@ -435,6 +462,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
synchronized (lock) {
|
||||||
|
if (segmentsToDelete.remove(segment)) {
|
||||||
serverManager.dropSegment(segment);
|
serverManager.dropSegment(segment);
|
||||||
|
|
||||||
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
|
||||||
@ -442,6 +471,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
|
|||||||
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
|
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
|
log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
|
||||||
.addData("segment", segment)
|
.addData("segment", segment)
|
||||||
|
@ -28,12 +28,17 @@ import org.joda.time.Interval;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class CacheTestSegmentLoader implements SegmentLoader
|
public class CacheTestSegmentLoader implements SegmentLoader
|
||||||
{
|
{
|
||||||
|
|
||||||
|
private final Set<DataSegment> segmentsInTrash = new HashSet<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
|
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
@ -84,7 +89,13 @@ public class CacheTestSegmentLoader implements SegmentLoader
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanup(DataSegment loadSpec) throws SegmentLoadingException
|
public void cleanup(DataSegment segment) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
|
segmentsInTrash.add(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<DataSegment> getSegmentsInTrash()
|
||||||
|
{
|
||||||
|
return segmentsInTrash;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
|
|||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Guice;
|
import com.google.inject.Guice;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
@ -59,20 +60,39 @@ import java.io.IOException;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class ZkCoordinatorTest extends CuratorTestBase
|
public class ZkCoordinatorTest extends CuratorTestBase
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
|
||||||
public static final int COUNT = 50;
|
public static final int COUNT = 50;
|
||||||
|
|
||||||
|
private static final Logger log = new Logger(ZkCoordinatorTest.class);
|
||||||
|
|
||||||
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||||
|
private final DruidServerMetadata me = new DruidServerMetadata(
|
||||||
|
"dummyServer",
|
||||||
|
"dummyHost",
|
||||||
|
0,
|
||||||
|
"dummyType",
|
||||||
|
"normal",
|
||||||
|
0
|
||||||
|
);
|
||||||
|
|
||||||
private ZkCoordinator zkCoordinator;
|
private ZkCoordinator zkCoordinator;
|
||||||
private ServerManager serverManager;
|
private ServerManager serverManager;
|
||||||
private DataSegmentAnnouncer announcer;
|
private DataSegmentAnnouncer announcer;
|
||||||
private File infoDir;
|
private File infoDir;
|
||||||
private AtomicInteger announceCount;
|
private AtomicInteger announceCount;
|
||||||
|
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
|
||||||
|
private CacheTestSegmentLoader segmentLoader;
|
||||||
|
private List<Runnable> scheduledRunnable;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception
|
public void setUp() throws Exception
|
||||||
@ -92,8 +112,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scheduledRunnable = Lists.newArrayList();
|
||||||
|
|
||||||
|
segmentLoader = new CacheTestSegmentLoader();
|
||||||
|
|
||||||
serverManager = new ServerManager(
|
serverManager = new ServerManager(
|
||||||
new CacheTestSegmentLoader(),
|
segmentLoader,
|
||||||
new NoopQueryRunnerFactoryConglomerate(),
|
new NoopQueryRunnerFactoryConglomerate(),
|
||||||
new NoopServiceEmitter(),
|
new NoopServiceEmitter(),
|
||||||
MoreExecutors.sameThreadExecutor(),
|
MoreExecutors.sameThreadExecutor(),
|
||||||
@ -103,8 +127,6 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
new CacheConfig()
|
new CacheConfig()
|
||||||
);
|
);
|
||||||
|
|
||||||
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
|
|
||||||
|
|
||||||
final ZkPathsConfig zkPaths = new ZkPathsConfig()
|
final ZkPathsConfig zkPaths = new ZkPathsConfig()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
@ -114,6 +136,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
|
||||||
announceCount = new AtomicInteger(0);
|
announceCount = new AtomicInteger(0);
|
||||||
announcer = new DataSegmentAnnouncer()
|
announcer = new DataSegmentAnnouncer()
|
||||||
{
|
{
|
||||||
@ -128,6 +151,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
@Override
|
@Override
|
||||||
public void announceSegment(DataSegment segment) throws IOException
|
public void announceSegment(DataSegment segment) throws IOException
|
||||||
{
|
{
|
||||||
|
segmentsAnnouncedByMe.add(segment);
|
||||||
announceCount.incrementAndGet();
|
announceCount.incrementAndGet();
|
||||||
delegate.announceSegment(segment);
|
delegate.announceSegment(segment);
|
||||||
}
|
}
|
||||||
@ -135,6 +159,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
@Override
|
@Override
|
||||||
public void unannounceSegment(DataSegment segment) throws IOException
|
public void unannounceSegment(DataSegment segment) throws IOException
|
||||||
{
|
{
|
||||||
|
segmentsAnnouncedByMe.remove(segment);
|
||||||
announceCount.decrementAndGet();
|
announceCount.decrementAndGet();
|
||||||
delegate.unannounceSegment(segment);
|
delegate.unannounceSegment(segment);
|
||||||
}
|
}
|
||||||
@ -142,6 +167,9 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
@Override
|
@Override
|
||||||
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
public void announceSegments(Iterable<DataSegment> segments) throws IOException
|
||||||
{
|
{
|
||||||
|
for (DataSegment segment : segments) {
|
||||||
|
segmentsAnnouncedByMe.add(segment);
|
||||||
|
}
|
||||||
announceCount.addAndGet(Iterables.size(segments));
|
announceCount.addAndGet(Iterables.size(segments));
|
||||||
delegate.announceSegments(segments);
|
delegate.announceSegments(segments);
|
||||||
}
|
}
|
||||||
@ -149,9 +177,18 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
@Override
|
@Override
|
||||||
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
|
||||||
{
|
{
|
||||||
|
for (DataSegment segment : segments) {
|
||||||
|
segmentsAnnouncedByMe.remove(segment);
|
||||||
|
}
|
||||||
announceCount.addAndGet(-Iterables.size(segments));
|
announceCount.addAndGet(-Iterables.size(segments));
|
||||||
delegate.unannounceSegments(segments);
|
delegate.unannounceSegments(segments);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnnounced(DataSegment segment)
|
||||||
|
{
|
||||||
|
return segmentsAnnouncedByMe.contains(segment);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
zkCoordinator = new ZkCoordinator(
|
zkCoordinator = new ZkCoordinator(
|
||||||
@ -175,13 +212,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
{
|
{
|
||||||
return 50;
|
return 50;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getDropSegmentDelayMillis()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
zkPaths,
|
zkPaths,
|
||||||
me,
|
me,
|
||||||
announcer,
|
announcer,
|
||||||
curator,
|
curator,
|
||||||
serverManager,
|
serverManager,
|
||||||
ScheduledExecutors.createFactory(new Lifecycle())
|
new ScheduledExecutorFactory()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ScheduledExecutorService create(int corePoolSize, String nameFormat)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Override normal behavoir by adding the runnable to a list so that you can make sure
|
||||||
|
all the shceduled runnables are executed by explicitly calling run() on each item in the list
|
||||||
|
*/
|
||||||
|
return new ScheduledThreadPoolExecutor(
|
||||||
|
corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()
|
||||||
|
)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> schedule(
|
||||||
|
Runnable command, long delay, TimeUnit unit
|
||||||
|
)
|
||||||
|
{
|
||||||
|
scheduledRunnable.add(command);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,6 +257,114 @@ public class ZkCoordinatorTest extends CuratorTestBase
|
|||||||
tearDownServerAndCurator();
|
tearDownServerAndCurator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Steps:
|
||||||
|
* 1. removeSegment() schedules a delete runnable that deletes segment files,
|
||||||
|
* 2. addSegment() succesfully loads the segment and annouces it
|
||||||
|
* 3. scheduled delete task executes and realizes it should not delete the segment files.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSegmentLoading1() throws Exception
|
||||||
|
{
|
||||||
|
zkCoordinator.start();
|
||||||
|
|
||||||
|
final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01"));
|
||||||
|
|
||||||
|
zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute()
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
|
||||||
|
|
||||||
|
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute()
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/*
|
||||||
|
make sure the scheduled runnable that "deletes" segment files has been executed.
|
||||||
|
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
|
||||||
|
ZkCoordinator, the scheduled runnable will not actually delete segment files.
|
||||||
|
*/
|
||||||
|
for (Runnable runnable : scheduledRunnable) {
|
||||||
|
runnable.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
|
||||||
|
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
|
||||||
|
|
||||||
|
zkCoordinator.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Steps:
|
||||||
|
* 1. addSegment() succesfully loads the segment and annouces it
|
||||||
|
* 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files
|
||||||
|
* 3. addSegment() calls loadSegment() and annouces it again
|
||||||
|
* 4. scheduled delete task executes and realizes it should not delete the segment files.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSegmentLoading2() throws Exception
|
||||||
|
{
|
||||||
|
zkCoordinator.start();
|
||||||
|
|
||||||
|
final DataSegment segment = makeSegment("test", "1", new Interval("P1d/2011-04-01"));
|
||||||
|
|
||||||
|
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute()
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
|
||||||
|
|
||||||
|
zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute()
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
|
||||||
|
|
||||||
|
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void execute()
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/*
|
||||||
|
make sure the scheduled runnable that "deletes" segment files has been executed.
|
||||||
|
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
|
||||||
|
ZkCoordinator, the scheduled runnable will not actually delete segment files.
|
||||||
|
*/
|
||||||
|
for (Runnable runnable : scheduledRunnable) {
|
||||||
|
runnable.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
|
||||||
|
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
|
||||||
|
|
||||||
|
zkCoordinator.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLoadCache() throws Exception
|
public void testLoadCache() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -152,5 +152,11 @@ public class CliRealtimeExample extends ServerRunnable
|
|||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAnnounced(DataSegment segment)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user