some fixes according to code review comments

This commit is contained in:
fjy 2013-06-06 11:02:08 -07:00
parent 8202bc70eb
commit 451d3d358b
6 changed files with 25 additions and 20 deletions

View File

@ -108,7 +108,7 @@ public class IndexingServiceClient
throw new ISE("Cannot find instance of indexingService");
}
return String.format("http://%s:%s/mmx/indexer/v1", instance.getAddress(), instance.getPort());
return String.format("http://%s:%s/druid/indexer/v1", instance.getAddress(), instance.getPort());
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -56,6 +56,15 @@ public abstract class AbstractTask implements Task
this(id, id, id, dataSource, interval);
}
protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.availabilityGroup = id;
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
}
protected AbstractTask(String id, String groupId, String availabilityGroup, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");

View File

@ -53,17 +53,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class IndexGeneratorTask extends AbstractTask
{
private static String makeTaskId(String groupId, DateTime start, DateTime end, int partitionNum)
{
return String.format(
"%s_generator_%s_%s_%s",
groupId,
start,
end,
partitionNum
);
}
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@ -88,9 +77,14 @@ public class IndexGeneratorTask extends AbstractTask
super(
id != null
? id
: makeTaskId(groupId, interval.getStart(), interval.getEnd(), schema.getShardSpec().getPartitionNum()),
: String.format(
"%s_generator_%s_%s_%s",
groupId,
interval.getStart(),
interval.getEnd(),
schema.getShardSpec().getPartitionNum()
),
groupId,
makeTaskId(groupId, interval.getStart(), interval.getEnd(), schema.getShardSpec().getPartitionNum()),
schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval")
);

View File

@ -64,7 +64,7 @@ public class RealtimeIndexTask extends AbstractTask
{
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
private static String makeTaskId(String dataSource, int partitionNum, DateTime version)
private static String makeTaskId(String dataSource, int partitionNum, String version)
{
return String.format(
"index_realtime_%s_%d_%s",
@ -114,14 +114,16 @@ public class RealtimeIndexTask extends AbstractTask
)
{
super(
id != null ? id : makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()),
id != null
? id
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()),
String.format(
"index_realtime_%s",
schema.getDataSource()
),
availabilityGroup != null
? availabilityGroup
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime()),
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()),
schema.getDataSource(),
null
);

View File

@ -345,8 +345,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
)
), "/*", 0
);
root.addFilter(GuiceFilter.class, "/mmx/indexer/v1/*", 0);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0);
root.addFilter(GuiceFilter.class, "/druid/indexer/v1/*", 0);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", 0); //backwards compatability, soon to be removed
initialized = true;
}

View File

@ -63,7 +63,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
*/
@Path("/mmx/indexer/v1")
@Path("/druid/indexer/v1")
public class IndexerCoordinatorResource
{
private static final Logger log = new Logger(IndexerCoordinatorResource.class);