Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
fjy 2013-12-16 12:01:55 -08:00
commit 84233238b1
5 changed files with 66 additions and 36 deletions

View File

@ -116,7 +116,13 @@ public class ServerManager implements QuerySegmentWalker
return segmentLoader.isSegmentLoaded(segment); return segmentLoader.isSegmentLoaded(segment);
} }
public void loadSegment(final DataSegment segment) throws SegmentLoadingException /**
* Load a single segment.
* @param segment segment to load
* @return true if the segment was newly loaded, false if it was already loaded
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{ {
final Segment adapter; final Segment adapter;
try { try {
@ -150,8 +156,8 @@ public class ServerManager implements QuerySegmentWalker
segment.getVersion() segment.getVersion()
); );
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier()); return false;
} }
loadedIntervals.add( loadedIntervals.add(
@ -165,6 +171,7 @@ public class ServerManager implements QuerySegmentWalker
synchronized (dataSourceCounts) { synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L); dataSourceCounts.add(dataSource, 1L);
} }
return true;
} }
} }

View File

@ -230,34 +230,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try { try {
serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
removeSegment(segment); removeSegment(segment);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
} }
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (loaded) {
if (!segmentInfoCacheFile.exists()) { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
try { try {
jsonMapper.writeValue(segmentInfoCacheFile, segment); announcer.announceSegment(segment);
} }
catch (IOException e) { catch (IOException e) {
removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
} }
} }
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource") log.makeAlert(e, "Failed to load segment for dataSource")
@ -275,8 +278,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try { try {
serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
@ -285,20 +289,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler
continue; continue;
} }
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (loaded) {
if (!segmentInfoCacheFile.exists()) { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
try { if (!segmentInfoCacheFile.exists()) {
jsonMapper.writeValue(segmentInfoCacheFile, segment); try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
segmentFailures.add(segment.getIdentifier());
continue;
}
} }
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
segmentFailures.add(segment.getIdentifier());
continue;
}
}
validSegments.add(segment); validSegments.add(segment);
}
} }
try { try {

View File

@ -46,10 +46,13 @@ import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient; import io.druid.client.indexing.IndexingServiceClient;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseSegmentManager; import io.druid.db.DatabaseSegmentManager;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -99,6 +102,8 @@ public class DruidCoordinator
private final LoadQueueTaskMaster taskMaster; private final LoadQueueTaskMaster taskMaster;
private final Map<String, LoadQueuePeon> loadManagementPeons; private final Map<String, LoadQueuePeon> loadManagementPeons;
private final AtomicReference<LeaderLatch> leaderLatch; private final AtomicReference<LeaderLatch> leaderLatch;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
@Inject @Inject
public DruidCoordinator( public DruidCoordinator(
@ -112,7 +117,9 @@ public class DruidCoordinator
ServiceEmitter emitter, ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient, IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self
) )
{ {
this( this(
@ -127,6 +134,8 @@ public class DruidCoordinator
scheduledExecutorFactory, scheduledExecutorFactory,
indexingServiceClient, indexingServiceClient,
taskMaster, taskMaster,
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap() Maps.<String, LoadQueuePeon>newConcurrentMap()
); );
} }
@ -143,6 +152,8 @@ public class DruidCoordinator
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorFactory scheduledExecutorFactory,
IndexingServiceClient indexingServiceClient, IndexingServiceClient indexingServiceClient,
LoadQueueTaskMaster taskMaster, LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap
) )
{ {
@ -157,6 +168,8 @@ public class DruidCoordinator
this.emitter = emitter; this.emitter = emitter;
this.indexingServiceClient = indexingServiceClient; this.indexingServiceClient = indexingServiceClient;
this.taskMaster = taskMaster; this.taskMaster = taskMaster;
this.serviceAnnouncer = serviceAnnouncer;
this.self = self;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
@ -474,6 +487,7 @@ public class DruidCoordinator
databaseSegmentManager.start(); databaseSegmentManager.start();
databaseRuleManager.start(); databaseRuleManager.start();
serverInventoryView.start(); serverInventoryView.start();
serviceAnnouncer.announce(self);
final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList(); final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();
dynamicConfigs = configManager.watch( dynamicConfigs = configManager.watch(
@ -554,8 +568,10 @@ public class DruidCoordinator
} }
loadManagementPeons.clear(); loadManagementPeons.clear();
databaseSegmentManager.stop(); serviceAnnouncer.unannounce(self);
serverInventoryView.stop(); serverInventoryView.stop();
databaseRuleManager.stop();
databaseSegmentManager.stop();
leader = false; leader = false;
} }
catch (Exception e) { catch (Exception e) {

View File

@ -23,8 +23,10 @@ import com.google.common.collect.MapMaker;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.client.SingleServerInventoryView; import io.druid.client.SingleServerInventoryView;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.db.DatabaseSegmentManager; import io.druid.db.DatabaseSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -111,6 +113,8 @@ public class DruidCoordinatorTest
scheduledExecutorFactory, scheduledExecutorFactory,
null, null,
taskMaster, taskMaster,
new NoopServiceAnnouncer(),
new DruidNode("hey", "what", 1234),
loadManagementPeons loadManagementPeons
); );
} }

View File

@ -28,7 +28,6 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.command.Command; import io.airlift.command.Command;
import io.druid.client.indexing.IndexingServiceClient; import io.druid.client.indexing.IndexingServiceClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.db.DatabaseRuleManager; import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseRuleManagerConfig; import io.druid.db.DatabaseRuleManagerConfig;
import io.druid.db.DatabaseRuleManagerProvider; import io.druid.db.DatabaseRuleManagerProvider;
@ -41,7 +40,6 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton; import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule; import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadQueueTaskMaster; import io.druid.server.coordinator.LoadQueueTaskMaster;
@ -103,7 +101,6 @@ public class CliCoordinator extends ServerRunnable
binder.bind(DruidCoordinator.class); binder.bind(DruidCoordinator.class);
LifecycleModule.register(binder, DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class);
DiscoveryModule.register(binder, Self.class);
binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer()); binder.bind(JettyServerInitializer.class).toInstance(new CoordinatorJettyServerInitializer());
Jerseys.addResource(binder, BackwardsCompatibleInfoResource.class); Jerseys.addResource(binder, BackwardsCompatibleInfoResource.class);