things dont work

This commit is contained in:
fjy 2013-12-13 13:35:22 -08:00
parent 7c744ffda0
commit ca7335f45f
23 changed files with 259 additions and 25 deletions

View File

@ -218,9 +218,9 @@ Congratulations! The segment has completed building. Once a segment is built, a
You should see the following logs on the coordinator:
```bash
2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Load Queues:
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served.
2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues:
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served.
```
These logs indicate that the coordinator has assigned our new segment to the historical node to download and serve. If you look at the historical node logs, you should see:

View File

@ -36,7 +36,9 @@ import java.util.concurrent.ConcurrentMap;
*/
public class DruidServer implements Comparable
{
public static final int DEFAULT_NUM_REPLICANTS = 2;
public static final String DEFAULT_TIER = "_default_tier";
private static final Logger log = new Logger(DruidServer.class);
private final Object lock = new Object();

View File

@ -32,7 +32,10 @@ public class DruidServerConfig
private long maxSize = 0;
@JsonProperty
private String tier = "_default_tier";
private String tier = DruidServer.DEFAULT_TIER;
@JsonProperty
private String zone = DruidServer.DEFAULT_ZONE;
public long getMaxSize()
{
@ -43,4 +46,8 @@ public class DruidServerConfig
{
return tier;
}
public String getZone() {
return zone;
}
}

View File

@ -31,9 +31,11 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
@ -86,10 +88,9 @@ public class DatabaseRuleManager
}
final List<Rule> defaultRules = Arrays.<Rule>asList(
new PeriodLoadRule(
new Period("P5000Y"),
2,
"_default_tier"
new ForeverLoadRule(
DruidServer.DEFAULT_NUM_REPLICANTS,
DruidServer.DEFAULT_TIER
)
);
final String version = new DateTime().toString();

View File

@ -50,6 +50,13 @@ import io.druid.db.DatabaseRuleManager;
import io.druid.db.DatabaseSegmentManager;
import io.druid.guice.ManageLifecycle;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.server.coordinator.helper.DruidCoordinatorCleanup;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -310,7 +317,7 @@ public class DruidCoordinator
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&

View File

@ -21,7 +21,7 @@ package io.druid.server.coordinator;
/**
*/
public abstract class LoadPeonCallback
public interface LoadPeonCallback
{
protected abstract void execute();
public void execute();
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -25,6 +25,14 @@ import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -163,7 +171,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
callback = new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
if (movingSegments != null) {

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
@ -25,6 +25,14 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -69,7 +77,7 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
segment, new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
}
}

View File

@ -17,7 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
/**
*/

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
@ -27,6 +27,11 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.collections.CountingMap;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import java.util.Map;

View File

@ -17,10 +17,15 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.metamx.emitter.EmittingLogger;
import io.druid.db.DatabaseRuleManager;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;

View File

@ -17,9 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.metamx.common.logger.Logger;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.timeline.DataSegment;
import java.util.Set;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -33,6 +33,9 @@ import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;

View File

@ -0,0 +1,66 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.helper;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.timeline.DataSegment;
import java.util.concurrent.atomic.AtomicReference;
public class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper
{
private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorVersionConverter.class);
private final IndexingServiceClient indexingServiceClient;
private final AtomicReference<DatasourceWhitelist> whitelistRef;
public DruidCoordinatorVersionConverter(
IndexingServiceClient indexingServiceClient,
AtomicReference<DatasourceWhitelist> whitelistRef
)
{
this.indexingServiceClient = indexingServiceClient;
this.whitelistRef = whitelistRef;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
DatasourceWhitelist whitelist = whitelistRef.get();
for (DataSegment dataSegment : params.getAvailableSegments()) {
if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) {
final Integer binaryVersion = dataSegment.getBinaryVersion();
if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) {
log.info("Upgrading version on segment[%s]", dataSegment.getIdentifier());
indexingServiceClient.upgradeSegment(dataSegment);
}
}
}
return params;
}
}

View File

@ -0,0 +1,40 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
/**
*/
public class ForeverDropRule extends DropRule
{
@Override
public String getType()
{
return "dropForever";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
}

View File

@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
/**
*/
public class ForeverLoadRule extends LoadRule
{
private final Integer replicants;
private final String tier;
@JsonCreator
public ForeverLoadRule(
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
}
@Override
public int getReplicants()
{
return replicants;
}
@Override
public int getReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
public String getTier()
{
return null;
}
@Override
public String getType()
{
return "loadForever";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
}

View File

@ -120,7 +120,7 @@ public abstract class LoadRule implements Rule
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
replicationManager.unregisterReplicantCreation(
getTier(),
@ -197,7 +197,7 @@ public abstract class LoadRule implements Rule
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
replicationManager.unregisterReplicantTermination(
getTier(),

View File

@ -33,6 +33,7 @@ import org.joda.time.DateTime;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class),
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
@JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class),
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
@JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class),

View File

@ -61,7 +61,7 @@ public class CoordinatorResource
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
return;
}
@ -91,7 +91,7 @@ public class CoordinatorResource
segmentToDrop.getFromServer(), segmentToDrop.getSegmentName(), new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
return;
}

View File

@ -29,6 +29,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.db.DatabaseRuleManager;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;

View File

@ -20,6 +20,7 @@
package io.druid.server.coordinator;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.timeline.DataSegment;
public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
@ -59,7 +60,7 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
loadPeon.loadSegment(segment.getSegment(), new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
}
});

View File

@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.client.DruidServer;
import io.druid.db.DatabaseRuleManager;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.IntervalDropRule;
import io.druid.server.coordinator.rules.Rule;

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import junit.framework.Assert;