mirror of https://github.com/apache/druid.git
1) Adjust the Config stuff that WorkerSetupManager was using to be reusable
2) Use new ConfigManager to pull out a whitelist for the Master
This commit is contained in:
parent
6e95dd49b3
commit
9ffccb6803
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merge;
|
||||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
@ -29,7 +29,7 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class ClientAppendQuery implements ClientMergeQuery
|
||||
public class ClientAppendQuery
|
||||
{
|
||||
private final String dataSource;
|
||||
private final List<DataSegment> segments;
|
||||
|
@ -45,14 +45,18 @@ public class ClientAppendQuery implements ClientMergeQuery
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "append";
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public List<DataSegment> getSegments()
|
||||
{
|
||||
return segments;
|
|
@ -0,0 +1,57 @@
|
|||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ClientConversionQuery
|
||||
{
|
||||
private final String dataSource;
|
||||
private final Interval interval;
|
||||
private final DataSegment segment;
|
||||
|
||||
public ClientConversionQuery(
|
||||
DataSegment segment
|
||||
)
|
||||
{
|
||||
this.dataSource = segment.getDataSource();
|
||||
this.interval = segment.getInterval();
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
public ClientConversionQuery(
|
||||
String dataSource,
|
||||
Interval interval
|
||||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.interval = interval;
|
||||
this.segment = null;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getType()
|
||||
{
|
||||
return "version_converter";
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DataSegment getSegment()
|
||||
{
|
||||
return segment;
|
||||
}
|
||||
}
|
|
@ -17,21 +17,21 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merge;
|
||||
|
||||
|
||||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
public class ClientDeleteQuery
|
||||
/**
|
||||
*/
|
||||
public class ClientKillQuery
|
||||
{
|
||||
private final String dataSource;
|
||||
private final Interval interval;
|
||||
|
||||
@JsonCreator
|
||||
public ClientDeleteQuery(
|
||||
public ClientKillQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval
|
||||
)
|
||||
|
@ -40,22 +40,21 @@ public class ClientDeleteQuery
|
|||
this.interval = interval;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getType()
|
||||
{
|
||||
return "kill";
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ClientDeleteQuery{" +
|
||||
"dataSource='" + dataSource + '\'' +
|
||||
", interval=" + interval +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merge;
|
||||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
@ -30,14 +30,14 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class ClientDefaultMergeQuery implements ClientMergeQuery
|
||||
public class ClientMergeQuery
|
||||
{
|
||||
private final String dataSource;
|
||||
private final List<DataSegment> segments;
|
||||
private final List<AggregatorFactory> aggregators;
|
||||
|
||||
@JsonCreator
|
||||
public ClientDefaultMergeQuery(
|
||||
public ClientMergeQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("segments") List<DataSegment> segments,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
|
||||
|
@ -50,14 +50,18 @@ public class ClientDefaultMergeQuery implements ClientMergeQuery
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "append";
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public List<DataSegment> getSegments()
|
||||
{
|
||||
return segments;
|
||||
|
@ -72,7 +76,7 @@ public class ClientDefaultMergeQuery implements ClientMergeQuery
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ClientDefaultMergeQuery{" +
|
||||
return "ClientMergeQuery{" +
|
||||
"dataSource='" + dataSource + '\'' +
|
||||
", segments=" + segments +
|
||||
", aggregators=" + aggregators +
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.client.indexing;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
||||
import com.netflix.curator.x.discovery.ServiceInstance;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class IndexingServiceClient
|
||||
{
|
||||
private static final InputStreamResponseHandler RESPONSE_HANDLER = new InputStreamResponseHandler();
|
||||
|
||||
private final HttpClient client;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ServiceProvider serviceProvider;
|
||||
|
||||
public IndexingServiceClient(
|
||||
HttpClient client,
|
||||
ObjectMapper jsonMapper,
|
||||
ServiceProvider serviceProvider
|
||||
)
|
||||
{
|
||||
this.client = client;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
public void mergeSegments(List<DataSegment> segments)
|
||||
{
|
||||
final Iterator<DataSegment> segmentsIter = segments.iterator();
|
||||
if (!segmentsIter.hasNext()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String dataSource = segmentsIter.next().getDataSource();
|
||||
while (segmentsIter.hasNext()) {
|
||||
DataSegment next = segmentsIter.next();
|
||||
if (!dataSource.equals(next.getDataSource())) {
|
||||
throw new IAE("Cannot merge segments of different dataSources[%s] and [%s]", dataSource, next.getDataSource());
|
||||
}
|
||||
}
|
||||
|
||||
runQuery("merge", new ClientAppendQuery(dataSource, segments));
|
||||
}
|
||||
|
||||
public void killSegments(String dataSource, Interval interval)
|
||||
{
|
||||
runQuery("index", new ClientKillQuery(dataSource, interval));
|
||||
}
|
||||
|
||||
public void upgradeSegment(DataSegment dataSegment)
|
||||
{
|
||||
runQuery("task", new ClientConversionQuery(dataSegment));
|
||||
}
|
||||
|
||||
public void upgradeSegments(String dataSource, Interval interval)
|
||||
{
|
||||
runQuery("task", new ClientConversionQuery(dataSource, interval));
|
||||
}
|
||||
|
||||
private InputStream runQuery(String endpoint, Object queryObject)
|
||||
{
|
||||
try {
|
||||
return client.post(new URL(String.format("%s/%s", baseUrl(), endpoint)))
|
||||
.setContent("application/json", jsonMapper.writeValueAsBytes(queryObject))
|
||||
.go(RESPONSE_HANDLER)
|
||||
.get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String baseUrl()
|
||||
{
|
||||
try {
|
||||
final ServiceInstance instance = serviceProvider.getInstance();
|
||||
return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
package com.metamx.druid.merge;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ClientKillQuery
|
||||
{
|
||||
private final String dataSource;
|
||||
private final Interval interval;
|
||||
|
||||
@JsonCreator
|
||||
public ClientKillQuery(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval
|
||||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getType()
|
||||
{
|
||||
return "kill";
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merge;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
|
||||
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use= JsonTypeInfo.Id.NAME, property="type", defaultImpl = ClientDefaultMergeQuery.class)
|
||||
@JsonSubTypes(value={
|
||||
@JsonSubTypes.Type(name="append", value=ClientAppendQuery.class)
|
||||
})
|
||||
public interface ClientMergeQuery
|
||||
{
|
||||
public String getDataSource();
|
||||
|
||||
public List<DataSegment> getSegments();
|
||||
}
|
|
@ -0,0 +1,259 @@
|
|||
package com.metamx.druid.config;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import org.joda.time.Duration;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
import org.skife.jdbi.v2.tweak.ResultSetMapper;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ConfigManager
|
||||
{
|
||||
private static final Logger log = new Logger(ConfigManager.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
private boolean started = false;
|
||||
|
||||
private final IDBI dbi;
|
||||
private final ConfigManagerConfig config;
|
||||
|
||||
private final ScheduledExecutorService exec;
|
||||
private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
|
||||
private final String selectStatement;
|
||||
|
||||
public ConfigManager(IDBI dbi, ConfigManagerConfig config)
|
||||
{
|
||||
this.dbi = dbi;
|
||||
this.config = config;
|
||||
|
||||
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
|
||||
this.watchedConfigs = Maps.newConcurrentMap();
|
||||
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable());
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec,
|
||||
new Duration(0),
|
||||
config.getPollDuration(),
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
poll();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
private void poll()
|
||||
{
|
||||
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
|
||||
try {
|
||||
if (entry.getValue().swapIfNew(lookup(entry.getKey()))) {
|
||||
log.info("New value for key[%s] seen.", entry.getKey());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Exception when checking property[%s]", entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> AtomicReference<T> watchConfig(final String key, final ConfigSerde<T> serde)
|
||||
{
|
||||
ConfigHolder<T> holder = watchedConfigs.get(key);
|
||||
if (holder == null) {
|
||||
try {
|
||||
log.info("Creating watch for key[%s]", key);
|
||||
|
||||
holder = exec.submit(
|
||||
new Callable<ConfigHolder<T>>()
|
||||
{
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ConfigHolder<T> call() throws Exception
|
||||
{
|
||||
if (!started) {
|
||||
watchedConfigs.put(key, new ConfigHolder<T>(null, serde));
|
||||
}
|
||||
else {
|
||||
try {
|
||||
// Multiple of these callables can be submitted at the same time, but the callables themselves
|
||||
// are executed serially, so double check that it hasn't already been populated.
|
||||
if (!watchedConfigs.containsKey(key)) {
|
||||
byte[] value = lookup(key);
|
||||
ConfigHolder<T> holder = new ConfigHolder<T>(value, serde);
|
||||
watchedConfigs.put(key, holder);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed loading config for key[%s]", key);
|
||||
watchedConfigs.put(key, new ConfigHolder<T>(null, serde));
|
||||
}
|
||||
}
|
||||
|
||||
return watchedConfigs.get(key);
|
||||
}
|
||||
}
|
||||
).get();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
return holder.getReference();
|
||||
}
|
||||
|
||||
public byte[] lookup(final String key)
|
||||
{
|
||||
return dbi.withHandle(
|
||||
new HandleCallback<byte[]>()
|
||||
{
|
||||
@Override
|
||||
public byte[] withHandle(Handle handle) throws Exception
|
||||
{
|
||||
return handle.createQuery(selectStatement)
|
||||
.bind("name", key)
|
||||
.map(
|
||||
new ResultSetMapper<byte[]>()
|
||||
{
|
||||
@Override
|
||||
public byte[] map(int index, ResultSet r, StatementContext ctx) throws SQLException
|
||||
{
|
||||
return r.getBytes("payload");
|
||||
}
|
||||
}
|
||||
)
|
||||
.first();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public <T> boolean set(final String key, final ConfigSerde<T> serde, final T obj)
|
||||
{
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final byte[] newBytes = serde.serialize(obj);
|
||||
|
||||
try {
|
||||
return exec.submit(
|
||||
new Callable<Boolean>()
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement("INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload")
|
||||
.bind("name", key)
|
||||
.bind("payload", newBytes)
|
||||
.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
final ConfigHolder configHolder = watchedConfigs.get(key);
|
||||
if (configHolder != null) {
|
||||
configHolder.swapIfNew(newBytes);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
).get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Failed to set[%s]", key);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ConfigHolder<T>
|
||||
{
|
||||
private final AtomicReference<byte[]> rawBytes;
|
||||
private final ConfigSerde<T> serde;
|
||||
private final AtomicReference<T> reference;
|
||||
|
||||
ConfigHolder(
|
||||
byte[] rawBytes,
|
||||
ConfigSerde<T> serde
|
||||
)
|
||||
{
|
||||
this.rawBytes = new AtomicReference<byte[]>(rawBytes);
|
||||
this.serde = serde;
|
||||
this.reference = new AtomicReference<T>(serde.deserialize(rawBytes));
|
||||
}
|
||||
|
||||
public AtomicReference<T> getReference()
|
||||
{
|
||||
return reference;
|
||||
}
|
||||
|
||||
public boolean swapIfNew(byte[] newBytes)
|
||||
{
|
||||
if (!Arrays.equals(newBytes, rawBytes.get())) {
|
||||
reference.set(serde.deserialize(newBytes));
|
||||
rawBytes.set(newBytes);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package com.metamx.druid.config;
|
||||
|
||||
import org.joda.time.Duration;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class ConfigManagerConfig
|
||||
{
|
||||
@Config("druid.indexer.configTable")
|
||||
public abstract String getConfigTable();
|
||||
|
||||
@Config("druid.indexer.poll.duration")
|
||||
@Default("PT1M")
|
||||
public abstract Duration getPollDuration();
|
||||
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package com.metamx.druid.config;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface ConfigSerde<T>
|
||||
{
|
||||
public byte[] serialize(T obj);
|
||||
public T deserialize(byte[] bytes);
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.config;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class JacksonConfigManager
|
||||
{
|
||||
private final ConfigManager configManager;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public JacksonConfigManager(
|
||||
ConfigManager configManager,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.configManager = configManager;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
||||
public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz)
|
||||
{
|
||||
return watch(key, clazz, null);
|
||||
}
|
||||
|
||||
public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz, T defaultVal)
|
||||
{
|
||||
return configManager.watchConfig(key, create(clazz, defaultVal));
|
||||
}
|
||||
|
||||
public <T> AtomicReference<T> watch(String key, TypeReference<T> clazz)
|
||||
{
|
||||
return watch(key, clazz, null);
|
||||
}
|
||||
|
||||
public <T> AtomicReference<T> watch(String key, TypeReference<T> clazz, T defaultVal)
|
||||
{
|
||||
return configManager.watchConfig(key, create(clazz, defaultVal));
|
||||
}
|
||||
|
||||
public <T> boolean set(String key, T val)
|
||||
{
|
||||
return configManager.set(key, create(val.getClass(), null), val);
|
||||
}
|
||||
|
||||
private <T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
|
||||
{
|
||||
return new ConfigSerde<T>()
|
||||
{
|
||||
@Override
|
||||
public byte[] serialize(T obj)
|
||||
{
|
||||
try {
|
||||
return jsonMapper.writeValueAsBytes(obj);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public T deserialize(byte[] bytes)
|
||||
{
|
||||
if (bytes == null) {
|
||||
return defaultVal;
|
||||
}
|
||||
|
||||
try {
|
||||
return jsonMapper.readValue(bytes, clazz);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private <T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal)
|
||||
{
|
||||
return new ConfigSerde<T>()
|
||||
{
|
||||
@Override
|
||||
public byte[] serialize(T obj)
|
||||
{
|
||||
try {
|
||||
return jsonMapper.writeValueAsBytes(obj);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public T deserialize(byte[] bytes)
|
||||
{
|
||||
if (bytes == null) {
|
||||
return defaultVal;
|
||||
}
|
||||
|
||||
try {
|
||||
return jsonMapper.readValue(bytes, clazz);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -65,7 +65,7 @@ public class DbConnector
|
|||
dbi,
|
||||
configTableName,
|
||||
String.format(
|
||||
"CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))",
|
||||
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
|
||||
configTableName
|
||||
)
|
||||
);
|
||||
|
@ -84,12 +84,7 @@ public class DbConnector
|
|||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
List<Map<String, Object>> table = handle.select(
|
||||
String.format(
|
||||
"SHOW tables LIKE '%s'",
|
||||
tableName
|
||||
)
|
||||
);
|
||||
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
|
||||
|
||||
if (table.isEmpty()) {
|
||||
log.info("Creating table[%s]", tableName);
|
||||
|
|
|
@ -46,7 +46,7 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
*/
|
||||
public class AppendTask extends MergeTask
|
||||
public class AppendTask extends MergeTaskBase
|
||||
{
|
||||
@JsonCreator
|
||||
public AppendTask(
|
||||
|
|
|
@ -1,89 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.QueryableIndex;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.index.v1.IndexMerger;
|
||||
|
||||
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DefaultMergeTask extends MergeTask
|
||||
{
|
||||
private final List<AggregatorFactory> aggregators;
|
||||
|
||||
@JsonCreator
|
||||
public DefaultMergeTask(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("segments") List<DataSegment> segments,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
|
||||
)
|
||||
{
|
||||
super(dataSource, segments);
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File merge(final Map<DataSegment, File> segments, final File outDir)
|
||||
throws Exception
|
||||
{
|
||||
return IndexMerger.mergeQueryableIndex(
|
||||
Lists.transform(
|
||||
ImmutableList.copyOf(segments.values()),
|
||||
new Function<File, QueryableIndex>()
|
||||
{
|
||||
@Override
|
||||
public QueryableIndex apply(@Nullable File input)
|
||||
{
|
||||
try {
|
||||
return IndexIO.loadIndex(input);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
),
|
||||
aggregators.toArray(new AggregatorFactory[aggregators.size()]),
|
||||
outDir
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "merge";
|
||||
}
|
||||
}
|
|
@ -19,301 +19,71 @@
|
|||
|
||||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.merger.common.TaskLock;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.actions.LockListAction;
|
||||
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.AlertEvent;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
import com.metamx.druid.index.QueryableIndex;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.index.v1.IndexMerger;
|
||||
|
||||
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultMergeTask.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "append", value = AppendTask.class)
|
||||
})
|
||||
public abstract class MergeTask extends AbstractTask
|
||||
public class MergeTask extends MergeTaskBase
|
||||
{
|
||||
private final List<DataSegment> segments;
|
||||
private final List<AggregatorFactory> aggregators;
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(MergeTask.class);
|
||||
|
||||
protected MergeTask(final String dataSource, final List<DataSegment> segments)
|
||||
{
|
||||
super(
|
||||
// _not_ the version, just something uniqueish
|
||||
String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()),
|
||||
dataSource,
|
||||
computeMergedInterval(segments)
|
||||
);
|
||||
|
||||
// Verify segment list is nonempty
|
||||
Preconditions.checkArgument(segments.size() > 0, "segments nonempty");
|
||||
// Verify segments are all in the correct datasource
|
||||
Preconditions.checkArgument(
|
||||
Iterables.size(
|
||||
Iterables.filter(
|
||||
segments,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment segment)
|
||||
{
|
||||
return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
|
||||
}
|
||||
}
|
||||
@JsonCreator
|
||||
public MergeTask(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("segments") List<DataSegment> segments,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
|
||||
)
|
||||
) == 0, "segments in the wrong datasource"
|
||||
);
|
||||
// Verify segments are all unsharded
|
||||
Preconditions.checkArgument(
|
||||
Iterables.size(
|
||||
Iterables.filter(
|
||||
segments,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment segment)
|
||||
{
|
||||
return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec);
|
||||
}
|
||||
}
|
||||
)
|
||||
) == 0, "segments without NoneShardSpec"
|
||||
);
|
||||
|
||||
this.segments = segments;
|
||||
super(dataSource, segments);
|
||||
this.aggregators = aggregators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
public File merge(final Map<DataSegment, File> segments, final File outDir)
|
||||
throws Exception
|
||||
{
|
||||
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
|
||||
final ServiceEmitter emitter = toolbox.getEmitter();
|
||||
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
||||
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
|
||||
final File taskDir = toolbox.getTaskDir();
|
||||
|
||||
try {
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
log.info(
|
||||
"Starting merge of id[%s], segments: %s",
|
||||
getId(),
|
||||
return IndexMerger.mergeQueryableIndex(
|
||||
Lists.transform(
|
||||
segments,
|
||||
new Function<DataSegment, String>()
|
||||
ImmutableList.copyOf(segments.values()),
|
||||
new Function<File, QueryableIndex>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DataSegment input)
|
||||
public QueryableIndex apply(@Nullable File input)
|
||||
{
|
||||
return input.getIdentifier();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
// download segments to merge
|
||||
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
|
||||
|
||||
// merge files together
|
||||
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
|
||||
|
||||
emitter.emit(builder.build("merger/numMerged", segments.size()));
|
||||
emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime));
|
||||
|
||||
log.info(
|
||||
"[%s] : Merged %d segments in %,d millis",
|
||||
mergedSegment.getDataSource(),
|
||||
segments.size(),
|
||||
System.currentTimeMillis() - startTime
|
||||
);
|
||||
|
||||
long uploadStart = System.currentTimeMillis();
|
||||
|
||||
// Upload file
|
||||
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment);
|
||||
|
||||
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
|
||||
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
|
||||
|
||||
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
|
||||
|
||||
return TaskStatus.success(getId());
|
||||
try {
|
||||
return IndexIO.loadIndex(input);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource())
|
||||
.addData("interval", mergedSegment.getInterval())
|
||||
.emit();
|
||||
|
||||
return TaskStatus.failure(getId());
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks pre-existing segments in "context" to confirm that this merge query is valid. Specifically, confirm that
|
||||
* we are operating on every segment that overlaps the chosen interval.
|
||||
*/
|
||||
@Override
|
||||
public TaskStatus preflight(TaskToolbox toolbox)
|
||||
{
|
||||
final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getIdentifier();
|
||||
}
|
||||
};
|
||||
|
||||
final Set<String> current = ImmutableSet.copyOf(
|
||||
Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier)
|
||||
),
|
||||
aggregators.toArray(new AggregatorFactory[aggregators.size()]),
|
||||
outDir
|
||||
);
|
||||
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
|
||||
|
||||
final Set<String> missingFromRequested = Sets.difference(current, requested);
|
||||
if (!missingFromRequested.isEmpty()) {
|
||||
throw new ISE(
|
||||
"Merge is invalid: current segment(s) are not in the requested set: %s",
|
||||
Joiner.on(", ").join(missingFromRequested)
|
||||
);
|
||||
}
|
||||
|
||||
final Set<String> missingFromCurrent = Sets.difference(requested, current);
|
||||
if (!missingFromCurrent.isEmpty()) {
|
||||
throw new ISE(
|
||||
"Merge is invalid: requested segment(s) are not in the current set: %s",
|
||||
Joiner.on(", ").join(missingFromCurrent)
|
||||
);
|
||||
}
|
||||
|
||||
return TaskStatus.running(getId());
|
||||
|
||||
}
|
||||
|
||||
protected abstract File merge(Map<DataSegment, File> segments, File outDir)
|
||||
throws Exception;
|
||||
|
||||
@JsonProperty
|
||||
public List<DataSegment> getSegments()
|
||||
{
|
||||
return segments;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
public String getType()
|
||||
{
|
||||
return Objects.toStringHelper(this)
|
||||
.add("id", getId())
|
||||
.add("dataSource", getDataSource())
|
||||
.add("interval", getImplicitLockInterval())
|
||||
.add("segments", segments)
|
||||
.toString();
|
||||
}
|
||||
|
||||
private static String computeProcessingID(final String dataSource, final List<DataSegment> segments)
|
||||
{
|
||||
final String segmentIDs = Joiner.on("_").join(
|
||||
Iterables.transform(
|
||||
Ordering.natural().sortedCopy(segments), new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(DataSegment x)
|
||||
{
|
||||
return String.format(
|
||||
"%s_%s_%s_%s",
|
||||
x.getInterval().getStart(),
|
||||
x.getInterval().getEnd(),
|
||||
x.getVersion(),
|
||||
x.getShardSpec().getPartitionNum()
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
return String.format(
|
||||
"%s_%s",
|
||||
dataSource,
|
||||
Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase()
|
||||
);
|
||||
}
|
||||
|
||||
private static Interval computeMergedInterval(final List<DataSegment> segments)
|
||||
{
|
||||
Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0");
|
||||
|
||||
DateTime start = null;
|
||||
DateTime end = null;
|
||||
|
||||
for(final DataSegment segment : segments) {
|
||||
if(start == null || segment.getInterval().getStart().isBefore(start)) {
|
||||
start = segment.getInterval().getStart();
|
||||
}
|
||||
|
||||
if(end == null || segment.getInterval().getEnd().isAfter(end)) {
|
||||
end = segment.getInterval().getEnd();
|
||||
}
|
||||
}
|
||||
|
||||
return new Interval(start, end);
|
||||
}
|
||||
|
||||
private static DataSegment computeMergedSegment(
|
||||
final String dataSource,
|
||||
final String version,
|
||||
final List<DataSegment> segments
|
||||
)
|
||||
{
|
||||
final Interval mergedInterval = computeMergedInterval(segments);
|
||||
final Set<String> mergedDimensions = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
final Set<String> mergedMetrics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
mergedDimensions.addAll(segment.getDimensions());
|
||||
mergedMetrics.addAll(segment.getMetrics());
|
||||
}
|
||||
|
||||
return DataSegment.builder()
|
||||
.dataSource(dataSource)
|
||||
.interval(mergedInterval)
|
||||
.version(version)
|
||||
.shardSpec(new NoneShardSpec())
|
||||
.dimensions(Lists.newArrayList(mergedDimensions))
|
||||
.metrics(Lists.newArrayList(mergedMetrics))
|
||||
.build();
|
||||
return "merge";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,315 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.merger.common.TaskLock;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.actions.LockListAction;
|
||||
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
|
||||
import com.metamx.druid.shard.NoneShardSpec;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.AlertEvent;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class MergeTaskBase extends AbstractTask
|
||||
{
|
||||
private final List<DataSegment> segments;
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class);
|
||||
|
||||
protected MergeTaskBase(final String dataSource, final List<DataSegment> segments)
|
||||
{
|
||||
super(
|
||||
// _not_ the version, just something uniqueish
|
||||
String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()),
|
||||
dataSource,
|
||||
computeMergedInterval(segments)
|
||||
);
|
||||
|
||||
// Verify segment list is nonempty
|
||||
Preconditions.checkArgument(segments.size() > 0, "segments nonempty");
|
||||
// Verify segments are all in the correct datasource
|
||||
Preconditions.checkArgument(
|
||||
Iterables.size(
|
||||
Iterables.filter(
|
||||
segments,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment segment)
|
||||
{
|
||||
return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource);
|
||||
}
|
||||
}
|
||||
)
|
||||
) == 0, "segments in the wrong datasource"
|
||||
);
|
||||
// Verify segments are all unsharded
|
||||
Preconditions.checkArgument(
|
||||
Iterables.size(
|
||||
Iterables.filter(
|
||||
segments,
|
||||
new Predicate<DataSegment>()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(@Nullable DataSegment segment)
|
||||
{
|
||||
return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec);
|
||||
}
|
||||
}
|
||||
)
|
||||
) == 0, "segments without NoneShardSpec"
|
||||
);
|
||||
|
||||
this.segments = segments;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
|
||||
final ServiceEmitter emitter = toolbox.getEmitter();
|
||||
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
|
||||
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
|
||||
final File taskDir = toolbox.getTaskDir();
|
||||
|
||||
try {
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
||||
log.info(
|
||||
"Starting merge of id[%s], segments: %s",
|
||||
getId(),
|
||||
Lists.transform(
|
||||
segments,
|
||||
new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(@Nullable DataSegment input)
|
||||
{
|
||||
return input.getIdentifier();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
// download segments to merge
|
||||
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
|
||||
|
||||
// merge files together
|
||||
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
|
||||
|
||||
emitter.emit(builder.build("merger/numMerged", segments.size()));
|
||||
emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime));
|
||||
|
||||
log.info(
|
||||
"[%s] : Merged %d segments in %,d millis",
|
||||
mergedSegment.getDataSource(),
|
||||
segments.size(),
|
||||
System.currentTimeMillis() - startTime
|
||||
);
|
||||
|
||||
long uploadStart = System.currentTimeMillis();
|
||||
|
||||
// Upload file
|
||||
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment);
|
||||
|
||||
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
|
||||
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
|
||||
|
||||
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
|
||||
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource())
|
||||
.addData("interval", mergedSegment.getInterval())
|
||||
.emit();
|
||||
|
||||
return TaskStatus.failure(getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks pre-existing segments in "context" to confirm that this merge query is valid. Specifically, confirm that
|
||||
* we are operating on every segment that overlaps the chosen interval.
|
||||
*/
|
||||
@Override
|
||||
public TaskStatus preflight(TaskToolbox toolbox)
|
||||
{
|
||||
final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(DataSegment dataSegment)
|
||||
{
|
||||
return dataSegment.getIdentifier();
|
||||
}
|
||||
};
|
||||
|
||||
final Set<String> current = ImmutableSet.copyOf(
|
||||
Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier)
|
||||
);
|
||||
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
|
||||
|
||||
final Set<String> missingFromRequested = Sets.difference(current, requested);
|
||||
if (!missingFromRequested.isEmpty()) {
|
||||
throw new ISE(
|
||||
"Merge is invalid: current segment(s) are not in the requested set: %s",
|
||||
Joiner.on(", ").join(missingFromRequested)
|
||||
);
|
||||
}
|
||||
|
||||
final Set<String> missingFromCurrent = Sets.difference(requested, current);
|
||||
if (!missingFromCurrent.isEmpty()) {
|
||||
throw new ISE(
|
||||
"Merge is invalid: requested segment(s) are not in the current set: %s",
|
||||
Joiner.on(", ").join(missingFromCurrent)
|
||||
);
|
||||
}
|
||||
|
||||
return TaskStatus.running(getId());
|
||||
|
||||
}
|
||||
|
||||
protected abstract File merge(Map<DataSegment, File> segments, File outDir)
|
||||
throws Exception;
|
||||
|
||||
@JsonProperty
|
||||
public List<DataSegment> getSegments()
|
||||
{
|
||||
return segments;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return Objects.toStringHelper(this)
|
||||
.add("id", getId())
|
||||
.add("dataSource", getDataSource())
|
||||
.add("interval", getImplicitLockInterval())
|
||||
.add("segments", segments)
|
||||
.toString();
|
||||
}
|
||||
|
||||
private static String computeProcessingID(final String dataSource, final List<DataSegment> segments)
|
||||
{
|
||||
final String segmentIDs = Joiner.on("_").join(
|
||||
Iterables.transform(
|
||||
Ordering.natural().sortedCopy(segments), new Function<DataSegment, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(DataSegment x)
|
||||
{
|
||||
return String.format(
|
||||
"%s_%s_%s_%s",
|
||||
x.getInterval().getStart(),
|
||||
x.getInterval().getEnd(),
|
||||
x.getVersion(),
|
||||
x.getShardSpec().getPartitionNum()
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
return String.format(
|
||||
"%s_%s",
|
||||
dataSource,
|
||||
Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase()
|
||||
);
|
||||
}
|
||||
|
||||
private static Interval computeMergedInterval(final List<DataSegment> segments)
|
||||
{
|
||||
Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0");
|
||||
|
||||
DateTime start = null;
|
||||
DateTime end = null;
|
||||
|
||||
for(final DataSegment segment : segments) {
|
||||
if(start == null || segment.getInterval().getStart().isBefore(start)) {
|
||||
start = segment.getInterval().getStart();
|
||||
}
|
||||
|
||||
if(end == null || segment.getInterval().getEnd().isAfter(end)) {
|
||||
end = segment.getInterval().getEnd();
|
||||
}
|
||||
}
|
||||
|
||||
return new Interval(start, end);
|
||||
}
|
||||
|
||||
private static DataSegment computeMergedSegment(
|
||||
final String dataSource,
|
||||
final String version,
|
||||
final List<DataSegment> segments
|
||||
)
|
||||
{
|
||||
final Interval mergedInterval = computeMergedInterval(segments);
|
||||
final Set<String> mergedDimensions = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
final Set<String> mergedMetrics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
|
||||
for (DataSegment segment : segments) {
|
||||
mergedDimensions.addAll(segment.getDimensions());
|
||||
mergedMetrics.addAll(segment.getMetrics());
|
||||
}
|
||||
|
||||
return DataSegment.builder()
|
||||
.dataSource(dataSource)
|
||||
.interval(mergedInterval)
|
||||
.version(version)
|
||||
.shardSpec(new NoneShardSpec())
|
||||
.dimensions(Lists.newArrayList(mergedDimensions))
|
||||
.metrics(Lists.newArrayList(mergedMetrics))
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -41,10 +41,10 @@ import org.joda.time.Interval;
|
|||
* to release locks early if they desire.</li>
|
||||
* </ul>
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultMergeTask.class)
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
|
||||
@JsonSubTypes.Type(name = "merge", value = DefaultMergeTask.class),
|
||||
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
|
||||
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
|
||||
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
|
||||
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
|
||||
|
@ -52,7 +52,7 @@ import org.joda.time.Interval;
|
|||
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
|
||||
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
|
||||
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
|
||||
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class)
|
||||
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class)
|
||||
})
|
||||
public interface Task
|
||||
{
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merger.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class VersionConverterSubTask extends AbstractTask
|
||||
{
|
||||
private static final Logger log = new Logger(VersionConverterSubTask.class);
|
||||
|
||||
private final DataSegment segment;
|
||||
|
||||
protected VersionConverterSubTask(
|
||||
@JsonProperty("groupId") String groupId,
|
||||
@JsonProperty("segment") DataSegment segment
|
||||
)
|
||||
{
|
||||
super(
|
||||
joinId(
|
||||
groupId,
|
||||
"sub",
|
||||
segment.getInterval().getStart(),
|
||||
segment.getInterval().getEnd(),
|
||||
segment.getShardSpec().getPartitionNum()
|
||||
),
|
||||
groupId,
|
||||
segment.getDataSource(),
|
||||
segment.getInterval()
|
||||
);
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "version_converter_sub";
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
log.info("Converting segment[%s]", segment);
|
||||
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
|
||||
|
||||
final File location = localSegments.get(segment);
|
||||
final File outLocation = new File(location, "v9_out");
|
||||
if (IndexIO.convertSegment(location, outLocation)) {
|
||||
final int outVersion = IndexIO.getVersionFromDir(outLocation);
|
||||
|
||||
// Appending to the version makes a new version that inherits most comparability parameters of the original
|
||||
// version, but is "newer" than said original version.
|
||||
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
|
||||
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
|
||||
|
||||
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
|
||||
}
|
||||
else {
|
||||
log.info("Conversion failed.");
|
||||
}
|
||||
|
||||
return success();
|
||||
}
|
||||
}
|
|
@ -22,19 +22,26 @@ package com.metamx.druid.merger.common.task;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.druid.loading.SegmentLoadingException;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
|
||||
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClient;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -44,10 +51,12 @@ public class VersionConverterTask extends AbstractTask
|
|||
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
|
||||
|
||||
private static final Logger log = new Logger(VersionConverterTask.class);
|
||||
private final DataSegment segment;
|
||||
|
||||
public VersionConverterTask(
|
||||
@JsonProperty("dataSource") String dataSource,
|
||||
@JsonProperty("interval") Interval interval
|
||||
@JsonProperty("interval") Interval interval,
|
||||
@JsonProperty("segment") DataSegment segment
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -55,6 +64,8 @@ public class VersionConverterTask extends AbstractTask
|
|||
dataSource,
|
||||
interval
|
||||
);
|
||||
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,12 +77,22 @@ public class VersionConverterTask extends AbstractTask
|
|||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
throw new ISE("Should never run, %s just exists to create subtasks", this.getClass().getSimpleName());
|
||||
if (segment == null) {
|
||||
throw new ISE("Segment was null, this should never run.", this.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
log.info("I'm in a subless mood.");
|
||||
convertSegment(toolbox, segment);
|
||||
return success();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
if (segment != null) {
|
||||
return super.preflight(toolbox);
|
||||
}
|
||||
|
||||
final TaskActionClient taskClient = toolbox.getTaskActionClientFactory();
|
||||
|
||||
List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
|
||||
|
@ -86,7 +107,7 @@ public class VersionConverterTask extends AbstractTask
|
|||
{
|
||||
final Integer segmentVersion = segment.getBinaryVersion();
|
||||
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
|
||||
return new VersionConverterSubTask(getGroupId(), segment);
|
||||
return new SubTask(getGroupId(), segment);
|
||||
}
|
||||
|
||||
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
|
||||
|
@ -99,4 +120,65 @@ public class VersionConverterTask extends AbstractTask
|
|||
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
||||
public static class SubTask extends AbstractTask
|
||||
{
|
||||
private final DataSegment segment;
|
||||
|
||||
protected SubTask(
|
||||
@JsonProperty("groupId") String groupId,
|
||||
@JsonProperty("segment") DataSegment segment
|
||||
)
|
||||
{
|
||||
super(
|
||||
joinId(
|
||||
groupId,
|
||||
"sub",
|
||||
segment.getInterval().getStart(),
|
||||
segment.getInterval().getEnd(),
|
||||
segment.getShardSpec().getPartitionNum()
|
||||
),
|
||||
groupId,
|
||||
segment.getDataSource(),
|
||||
segment.getInterval()
|
||||
);
|
||||
this.segment = segment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "version_converter_sub";
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
log.info("Subs are good! Italian BMT and Meatball are probably my favorite.");
|
||||
convertSegment(toolbox, segment);
|
||||
return success();
|
||||
}
|
||||
}
|
||||
|
||||
private static void convertSegment(TaskToolbox toolbox, final DataSegment segment)
|
||||
throws SegmentLoadingException, IOException
|
||||
{
|
||||
log.info("Converting segment[%s]", segment);
|
||||
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
|
||||
|
||||
final File location = localSegments.get(segment);
|
||||
final File outLocation = new File(location, "v9_out");
|
||||
if (IndexIO.convertSegment(location, outLocation)) {
|
||||
final int outVersion = IndexIO.getVersionFromDir(outLocation);
|
||||
|
||||
// Appending to the version makes a new version that inherits most comparability parameters of the original
|
||||
// version, but is "newer" than said original version.
|
||||
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
|
||||
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
|
||||
|
||||
toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
|
||||
} else {
|
||||
log.info("Conversion failed.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.merger.coordinator.config;
|
||||
|
||||
import org.joda.time.Duration;
|
||||
import org.skife.config.Config;
|
||||
import org.skife.config.Default;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class WorkerSetupManagerConfig
|
||||
{
|
||||
@Config("druid.indexer.configTable")
|
||||
public abstract String getConfigTable();
|
||||
|
||||
@Config("druid.indexer.workerSetupConfigName")
|
||||
public abstract String getWorkerSetupConfigName();
|
||||
|
||||
@Config("druid.indexer.poll.duration")
|
||||
@Default("PT1M")
|
||||
public abstract Duration getPollDuration();
|
||||
}
|
|
@ -39,6 +39,9 @@ import com.metamx.common.lifecycle.LifecycleStart;
|
|||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.RegisteringNode;
|
||||
import com.metamx.druid.config.ConfigManager;
|
||||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.http.GuiceServletConfig;
|
||||
|
@ -77,7 +80,6 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
|
|||
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
|
||||
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
|
||||
import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy;
|
||||
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
|
||||
import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy;
|
||||
|
@ -556,18 +558,12 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
|||
public void initializeWorkerSetupManager()
|
||||
{
|
||||
if (workerSetupManager == null) {
|
||||
final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class);
|
||||
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
|
||||
final ConfigManager configManager = new ConfigManager(dbi, configManagerConfig);
|
||||
lifecycle.addManagedInstance(configManager);
|
||||
|
||||
DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable());
|
||||
workerSetupManager = new WorkerSetupManager(
|
||||
dbi, Executors.newScheduledThreadPool(
|
||||
1,
|
||||
new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("WorkerSetupManagerExec--%d")
|
||||
.build()
|
||||
), jsonMapper, workerSetupManagerConfig
|
||||
);
|
||||
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());
|
||||
workerSetupManager = new WorkerSetupManager(new JacksonConfigManager(configManager, jsonMapper));
|
||||
}
|
||||
lifecycle.addManagedInstance(workerSetupManager);
|
||||
}
|
||||
|
|
|
@ -95,18 +95,15 @@ public class IndexerCoordinatorResource
|
|||
@Produces("application/json")
|
||||
public Response doIndex(final Task task)
|
||||
{
|
||||
// verify against whitelist
|
||||
if (config.isWhitelistEnabled() && !config.getWhitelistDatasources().contains(task.getDataSource())) {
|
||||
return Response.status(Response.Status.BAD_REQUEST)
|
||||
.entity(
|
||||
ImmutableMap.of(
|
||||
"error",
|
||||
String.format("dataSource[%s] is not whitelisted", task.getDataSource())
|
||||
)
|
||||
)
|
||||
.build();
|
||||
return taskPost(task);
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/task")
|
||||
@Consumes("application/json")
|
||||
@Produces("application/json")
|
||||
public Response taskPost(final Task task)
|
||||
{
|
||||
taskMasterLifecycle.getTaskQueue().add(task);
|
||||
return Response.ok(ImmutableMap.of("task", task.getId())).build();
|
||||
}
|
||||
|
|
|
@ -19,209 +19,41 @@
|
|||
|
||||
package com.metamx.druid.merger.coordinator.setup;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
|
||||
import org.joda.time.Duration;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WorkerSetupManager
|
||||
{
|
||||
private static final Logger log = new Logger(WorkerSetupManager.class);
|
||||
private static final String WORKER_SETUP_KEY = "worker.setup";
|
||||
|
||||
private final DBI dbi;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ScheduledExecutorService exec;
|
||||
private final WorkerSetupManagerConfig config;
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final JacksonConfigManager configManager;
|
||||
|
||||
private volatile AtomicReference<WorkerSetupData> workerSetupData = new AtomicReference<WorkerSetupData>(null);
|
||||
private volatile boolean started = false;
|
||||
|
||||
public WorkerSetupManager(
|
||||
DBI dbi,
|
||||
ScheduledExecutorService exec,
|
||||
ObjectMapper jsonMapper,
|
||||
WorkerSetupManagerConfig config
|
||||
JacksonConfigManager configManager
|
||||
)
|
||||
{
|
||||
this.dbi = dbi;
|
||||
this.exec = exec;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
this.configManager = configManager;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (started) {
|
||||
return;
|
||||
workerSetupData = configManager.watch(WORKER_SETUP_KEY, WorkerSetupData.class);
|
||||
}
|
||||
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec,
|
||||
new Duration(0),
|
||||
config.getPollDuration(),
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
poll();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
public void poll()
|
||||
{
|
||||
try {
|
||||
List<WorkerSetupData> setupDataList = dbi.withHandle(
|
||||
new HandleCallback<List<WorkerSetupData>>()
|
||||
{
|
||||
@Override
|
||||
public List<WorkerSetupData> withHandle(Handle handle) throws Exception
|
||||
{
|
||||
return handle.createQuery(
|
||||
String.format(
|
||||
"SELECT payload FROM %s WHERE name = :name",
|
||||
config.getConfigTable()
|
||||
)
|
||||
)
|
||||
.bind("name", config.getWorkerSetupConfigName())
|
||||
.fold(
|
||||
Lists.<WorkerSetupData>newArrayList(),
|
||||
new Folder3<ArrayList<WorkerSetupData>, Map<String, Object>>()
|
||||
{
|
||||
@Override
|
||||
public ArrayList<WorkerSetupData> fold(
|
||||
ArrayList<WorkerSetupData> workerNodeConfigurations,
|
||||
Map<String, Object> stringObjectMap,
|
||||
FoldController foldController,
|
||||
StatementContext statementContext
|
||||
) throws SQLException
|
||||
{
|
||||
try {
|
||||
// stringObjectMap lowercases and jackson may fail serde
|
||||
workerNodeConfigurations.add(
|
||||
jsonMapper.readValue(
|
||||
MapUtils.getString(stringObjectMap, "payload"),
|
||||
WorkerSetupData.class
|
||||
)
|
||||
);
|
||||
return workerNodeConfigurations;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
if (setupDataList.isEmpty()) {
|
||||
throw new ISE("WTF?! No configuration found for worker nodes!");
|
||||
} else if (setupDataList.size() != 1) {
|
||||
throw new ISE("WTF?! Found more than one configuration for worker nodes");
|
||||
}
|
||||
|
||||
workerSetupData.set(setupDataList.get(0));
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception while polling for worker setup data!");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public WorkerSetupData getWorkerSetupData()
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (!started) {
|
||||
throw new ISE("Must start WorkerSetupManager first!");
|
||||
}
|
||||
|
||||
return workerSetupData.get();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean setWorkerSetupData(final WorkerSetupData value)
|
||||
{
|
||||
synchronized (lock) {
|
||||
try {
|
||||
if (!started) {
|
||||
throw new ISE("Must start WorkerSetupManager first!");
|
||||
}
|
||||
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
|
||||
config.getConfigTable()
|
||||
)
|
||||
)
|
||||
.bind("name", config.getWorkerSetupConfigName())
|
||||
.bind("payload", jsonMapper.writeValueAsString(value))
|
||||
.execute();
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
workerSetupData.set(value);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception updating worker config");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
return configManager.set(WORKER_SETUP_KEY, value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,14 +26,14 @@ import com.metamx.druid.aggregation.AggregatorFactory;
|
|||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.TaskToolbox;
|
||||
import com.metamx.druid.merger.common.task.DefaultMergeTask;
|
||||
import com.metamx.druid.merger.common.task.MergeTask;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeName("test")
|
||||
public class TestTask extends DefaultMergeTask
|
||||
public class TestTask extends MergeTask
|
||||
{
|
||||
private final String id;
|
||||
private final TaskStatus status;
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.io.File;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MergeTaskTest
|
||||
public class MergeTaskBaseTest
|
||||
{
|
||||
private final DataSegment.Builder segmentBuilder = DataSegment.builder()
|
||||
.dataSource("foo")
|
||||
|
@ -43,7 +43,7 @@ public class MergeTaskTest
|
|||
.add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build())
|
||||
.build();
|
||||
|
||||
final MergeTask testMergeTask = new MergeTask("foo", segments)
|
||||
final MergeTaskBase testMergeTaskBase = new MergeTaskBase("foo", segments)
|
||||
{
|
||||
@Override
|
||||
protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception
|
||||
|
@ -61,13 +61,13 @@ public class MergeTaskTest
|
|||
@Test
|
||||
public void testDataSource()
|
||||
{
|
||||
Assert.assertEquals("foo", testMergeTask.getDataSource());
|
||||
Assert.assertEquals("foo", testMergeTaskBase.getDataSource());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterval()
|
||||
{
|
||||
Assert.assertEquals(new Interval("2012-01-03/2012-01-07"), testMergeTask.getImplicitLockInterval().get());
|
||||
Assert.assertEquals(new Interval("2012-01-03/2012-01-07"), testMergeTaskBase.getImplicitLockInterval().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -81,7 +81,7 @@ public class MergeTaskTest
|
|||
).toString().toLowerCase() + "_";
|
||||
Assert.assertEquals(
|
||||
desiredPrefix,
|
||||
testMergeTask.getId().substring(0, desiredPrefix.length())
|
||||
testMergeTaskBase.getId().substring(0, desiredPrefix.length())
|
||||
);
|
||||
}
|
||||
}
|
2
pom.xml
2
pom.xml
|
@ -38,7 +38,7 @@
|
|||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<metamx.java-util.version>0.20.0</metamx.java-util.version>
|
||||
<metamx.java-util.version>0.20.1-SNAPSHOT</metamx.java-util.version>
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
|
|
|
@ -34,8 +34,8 @@ import com.metamx.druid.coordination.DruidClusterInfo;
|
|||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.master.rules.Rule;
|
||||
import com.metamx.druid.merge.ClientKillQuery;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -66,6 +66,7 @@ public class InfoResource
|
|||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final DatabaseRuleManager databaseRuleManager;
|
||||
private final DruidClusterInfo druidClusterInfo;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
@Inject
|
||||
public InfoResource(
|
||||
|
@ -73,7 +74,8 @@ public class InfoResource
|
|||
ServerInventoryManager serverInventoryManager,
|
||||
DatabaseSegmentManager databaseSegmentManager,
|
||||
DatabaseRuleManager databaseRuleManager,
|
||||
DruidClusterInfo druidClusterInfo
|
||||
DruidClusterInfo druidClusterInfo,
|
||||
IndexingServiceClient indexingServiceClient
|
||||
)
|
||||
{
|
||||
this.master = master;
|
||||
|
@ -81,6 +83,7 @@ public class InfoResource
|
|||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
this.databaseRuleManager = databaseRuleManager;
|
||||
this.druidClusterInfo = druidClusterInfo;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
@GET
|
||||
|
@ -374,7 +377,7 @@ public class InfoResource
|
|||
)
|
||||
{
|
||||
if (kill != null && Boolean.valueOf(kill)) {
|
||||
master.killSegments(new ClientKillQuery(dataSourceName, new Interval(interval)));
|
||||
indexingServiceClient.killSegments(dataSourceName, new Interval(interval));
|
||||
} else {
|
||||
if (!databaseSegmentManager.removeDatasource(dataSourceName)) {
|
||||
return Response.status(Response.Status.NOT_FOUND).build();
|
||||
|
|
|
@ -33,6 +33,9 @@ import com.metamx.common.lifecycle.Lifecycle;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.ServerInventoryManager;
|
||||
import com.metamx.druid.client.ServerInventoryManagerConfig;
|
||||
import com.metamx.druid.config.ConfigManager;
|
||||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.coordination.DruidClusterInfoConfig;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
|
@ -49,6 +52,7 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
|
|||
import com.metamx.druid.log.LogLevelAdjuster;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.master.DruidMasterConfig;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.master.LoadQueuePeon;
|
||||
import com.metamx.druid.utils.PropUtils;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -86,7 +90,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
*/
|
||||
public class MasterMain
|
||||
{
|
||||
private static final Logger log = new Logger(ServerMain.class);
|
||||
private static final Logger log = new Logger(MasterMain.class);
|
||||
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
|
@ -166,13 +170,14 @@ public class MasterMain
|
|||
lifecycle
|
||||
);
|
||||
|
||||
ServiceProvider serviceProvider = null;
|
||||
IndexingServiceClient indexingServiceClient = null;
|
||||
if (druidMasterConfig.getMergerServiceName() != null) {
|
||||
serviceProvider = Initialization.makeServiceProvider(
|
||||
ServiceProvider serviceProvider = Initialization.makeServiceProvider(
|
||||
druidMasterConfig.getMergerServiceName(),
|
||||
serviceDiscovery,
|
||||
lifecycle
|
||||
);
|
||||
indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider);
|
||||
}
|
||||
|
||||
final DruidClusterInfo druidClusterInfo = new DruidClusterInfo(
|
||||
|
@ -180,10 +185,14 @@ public class MasterMain
|
|||
masterYp
|
||||
);
|
||||
|
||||
JacksonConfigManager configManager = new JacksonConfigManager(
|
||||
new ConfigManager(dbi, configFactory.build(ConfigManagerConfig.class)), jsonMapper
|
||||
);
|
||||
|
||||
final DruidMaster master = new DruidMaster(
|
||||
druidMasterConfig,
|
||||
druidClusterInfo,
|
||||
jsonMapper,
|
||||
configManager,
|
||||
databaseSegmentManager,
|
||||
serverInventoryManager,
|
||||
databaseRuleManager,
|
||||
|
@ -191,9 +200,7 @@ public class MasterMain
|
|||
emitter,
|
||||
scheduledExecutorFactory,
|
||||
new ConcurrentHashMap<String, LoadQueuePeon>(),
|
||||
serviceProvider,
|
||||
httpClient,
|
||||
new ToStringResponseHandler(Charsets.UTF_8)
|
||||
indexingServiceClient
|
||||
);
|
||||
lifecycle.addManagedInstance(master);
|
||||
|
||||
|
@ -226,7 +233,8 @@ public class MasterMain
|
|||
databaseRuleManager,
|
||||
druidClusterInfo,
|
||||
master,
|
||||
jsonMapper
|
||||
jsonMapper,
|
||||
indexingServiceClient
|
||||
)
|
||||
);
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ package com.metamx.druid.http;
|
|||
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.master.LoadPeonCallback;
|
||||
import com.metamx.druid.merge.ClientKillQuery;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.ws.rs.Consumes;
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.metamx.druid.coordination.DruidClusterInfo;
|
|||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.master.DruidMaster;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.sun.jersey.guice.JerseyServletModule;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
|
||||
|
@ -44,6 +45,7 @@ public class MasterServletModule extends JerseyServletModule
|
|||
private final DruidClusterInfo druidClusterInfo;
|
||||
private final DruidMaster master;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
|
||||
public MasterServletModule(
|
||||
ServerInventoryManager serverInventoryManager,
|
||||
|
@ -51,7 +53,8 @@ public class MasterServletModule extends JerseyServletModule
|
|||
DatabaseRuleManager databaseRuleManager,
|
||||
DruidClusterInfo druidClusterInfo,
|
||||
DruidMaster master,
|
||||
ObjectMapper jsonMapper
|
||||
ObjectMapper jsonMapper,
|
||||
IndexingServiceClient indexingServiceClient
|
||||
)
|
||||
{
|
||||
this.serverInventoryManager = serverInventoryManager;
|
||||
|
@ -60,6 +63,7 @@ public class MasterServletModule extends JerseyServletModule
|
|||
this.druidClusterInfo = druidClusterInfo;
|
||||
this.master = master;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -72,6 +76,7 @@ public class MasterServletModule extends JerseyServletModule
|
|||
bind(DatabaseRuleManager.class).toInstance(databaseRuleManager);
|
||||
bind(DruidMaster.class).toInstance(master);
|
||||
bind(DruidClusterInfo.class).toInstance(druidClusterInfo);
|
||||
bind(IndexingServiceClient.class).toInstance(indexingServiceClient);
|
||||
|
||||
serve("/*").with(GuiceContainer.class);
|
||||
}
|
||||
|
|
|
@ -19,10 +19,8 @@
|
|||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -41,32 +39,30 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.client.DruidDataSource;
|
||||
import com.metamx.druid.client.DruidServer;
|
||||
import com.metamx.druid.client.ServerInventoryManager;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.coordination.DruidClusterInfo;
|
||||
import com.metamx.druid.db.DatabaseRuleManager;
|
||||
import com.metamx.druid.db.DatabaseSegmentManager;
|
||||
import com.metamx.druid.merge.ClientKillQuery;
|
||||
import com.metamx.druid.index.v1.IndexIO;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.HttpResponseHandler;
|
||||
import com.metamx.phonebook.PhoneBook;
|
||||
import com.metamx.phonebook.PhoneBookPeon;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -83,26 +79,22 @@ public class DruidMaster
|
|||
|
||||
private final DruidMasterConfig config;
|
||||
private final DruidClusterInfo clusterInfo;
|
||||
private final JacksonConfigManager configManager;
|
||||
private final DatabaseSegmentManager databaseSegmentManager;
|
||||
private final ServerInventoryManager serverInventoryManager;
|
||||
private final DatabaseRuleManager databaseRuleManager;
|
||||
private final PhoneBook yp;
|
||||
private final ServiceEmitter emitter;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
private final ScheduledExecutorService exec;
|
||||
private final ScheduledExecutorService peonExec;
|
||||
private final PhoneBookPeon masterPeon;
|
||||
private final Map<String, LoadQueuePeon> loadManagementPeons;
|
||||
private final ServiceProvider serviceProvider;
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final HttpResponseHandler responseHandler;
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
public DruidMaster(
|
||||
DruidMasterConfig config,
|
||||
DruidClusterInfo clusterInfo,
|
||||
ObjectMapper jsonMapper,
|
||||
JacksonConfigManager configManager,
|
||||
DatabaseSegmentManager databaseSegmentManager,
|
||||
ServerInventoryManager serverInventoryManager,
|
||||
DatabaseRuleManager databaseRuleManager,
|
||||
|
@ -110,31 +102,25 @@ public class DruidMaster
|
|||
ServiceEmitter emitter,
|
||||
ScheduledExecutorFactory scheduledExecutorFactory,
|
||||
Map<String, LoadQueuePeon> loadManagementPeons,
|
||||
ServiceProvider serviceProvider,
|
||||
HttpClient httpClient,
|
||||
HttpResponseHandler responseHandler
|
||||
IndexingServiceClient indexingServiceClient
|
||||
)
|
||||
{
|
||||
this.config = config;
|
||||
this.clusterInfo = clusterInfo;
|
||||
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.configManager = configManager;
|
||||
|
||||
this.databaseSegmentManager = databaseSegmentManager;
|
||||
this.serverInventoryManager = serverInventoryManager;
|
||||
this.databaseRuleManager = databaseRuleManager;
|
||||
this.yp = zkPhoneBook;
|
||||
this.emitter = emitter;
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
|
||||
this.masterPeon = new MasterListeningPeon();
|
||||
this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d");
|
||||
this.peonExec = scheduledExecutorFactory.create(1, "Master-PeonExec--%d");
|
||||
|
||||
this.loadManagementPeons = loadManagementPeons;
|
||||
|
||||
this.serviceProvider = serviceProvider;
|
||||
this.httpClient = httpClient;
|
||||
this.responseHandler = responseHandler;
|
||||
}
|
||||
|
||||
public boolean isClusterMaster()
|
||||
|
@ -349,27 +335,6 @@ public class DruidMaster
|
|||
}
|
||||
}
|
||||
|
||||
public void killSegments(ClientKillQuery killQuery)
|
||||
{
|
||||
try {
|
||||
httpClient.post(
|
||||
new URL(
|
||||
String.format(
|
||||
"http://%s:%s/mmx/merger/v1/index",
|
||||
serviceProvider.getInstance().getAddress(),
|
||||
serviceProvider.getInstance().getPort()
|
||||
)
|
||||
)
|
||||
)
|
||||
.setContent("application/json", jsonMapper.writeValueAsBytes(killQuery))
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<DataSegment> getAvailableDataSegments()
|
||||
{
|
||||
Set<DataSegment> availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
|
||||
|
@ -390,7 +355,9 @@ public class DruidMaster
|
|||
|
||||
for (DataSegment dataSegment : dataSegments) {
|
||||
if (dataSegment.getSize() < 0) {
|
||||
log.warn("No size on Segment[%s], wtf?", dataSegment);
|
||||
log.makeAlert("No size on Segment, wtf?")
|
||||
.addData("segment", dataSegment)
|
||||
.emit();
|
||||
}
|
||||
availableSegments.add(dataSegment);
|
||||
}
|
||||
|
@ -466,8 +433,14 @@ public class DruidMaster
|
|||
final List<Pair<? extends MasterRunnable, Duration>> masterRunnables = Lists.newArrayList();
|
||||
|
||||
masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod()));
|
||||
if (config.isMergeSegments() && serviceProvider != null) {
|
||||
masterRunnables.add(Pair.of(new MasterSegmentMergerRunnable(), config.getMasterSegmentMergerPeriod()));
|
||||
if (config.isMergeSegments() && indexingServiceClient != null) {
|
||||
|
||||
masterRunnables.add(
|
||||
Pair.of(
|
||||
new MasterSegmentMergerRunnable(configManager.watch(MergerWhitelist.CONFIG_KEY, MergerWhitelist.class)),
|
||||
config.getMasterSegmentMergerPeriod()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
for (final Pair<? extends MasterRunnable, Duration> masterRunnable : masterRunnables) {
|
||||
|
@ -529,6 +502,39 @@ public class DruidMaster
|
|||
}
|
||||
}
|
||||
|
||||
public static class DruidMasterVersionConverter implements DruidMasterHelper
|
||||
{
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
private final AtomicReference<MergerWhitelist> whitelistRef;
|
||||
|
||||
public DruidMasterVersionConverter(
|
||||
IndexingServiceClient indexingServiceClient,
|
||||
AtomicReference<MergerWhitelist> whitelistRef
|
||||
)
|
||||
{
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
this.whitelistRef = whitelistRef;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
||||
{
|
||||
MergerWhitelist whitelist = whitelistRef.get();
|
||||
|
||||
for (DataSegment dataSegment : params.getAvailableSegments()) {
|
||||
if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) {
|
||||
final Integer binaryVersion = dataSegment.getBinaryVersion();
|
||||
|
||||
if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) {
|
||||
indexingServiceClient.upgradeSegment(dataSegment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return params;
|
||||
}
|
||||
}
|
||||
|
||||
private class MasterListeningPeon implements PhoneBookPeon<Map>
|
||||
{
|
||||
@Override
|
||||
|
@ -723,12 +729,13 @@ public class DruidMaster
|
|||
|
||||
private class MasterSegmentMergerRunnable extends MasterRunnable
|
||||
{
|
||||
private MasterSegmentMergerRunnable()
|
||||
private MasterSegmentMergerRunnable(final AtomicReference<MergerWhitelist> whitelistRef)
|
||||
{
|
||||
super(
|
||||
ImmutableList.of(
|
||||
new DruidMasterSegmentInfoLoader(DruidMaster.this),
|
||||
new DruidMasterSegmentMerger(jsonMapper, serviceProvider),
|
||||
new DruidMasterVersionConverter(indexingServiceClient, whitelistRef),
|
||||
new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef),
|
||||
new DruidMasterHelper()
|
||||
{
|
||||
@Override
|
||||
|
@ -739,8 +746,7 @@ public class DruidMaster
|
|||
|
||||
params.getEmitter().emit(
|
||||
new ServiceMetricEvent.Builder().build(
|
||||
"master/merge/count",
|
||||
stats.getGlobalStats().get("mergedCount")
|
||||
"master/merge/count", stats.getGlobalStats().get("mergedCount")
|
||||
)
|
||||
);
|
||||
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.HashMultiset;
|
||||
|
@ -32,22 +30,19 @@ import com.google.common.collect.Multiset;
|
|||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.TimelineObjectHolder;
|
||||
import com.metamx.druid.VersionedIntervalTimeline;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import com.metamx.druid.partition.PartitionChunk;
|
||||
import com.metamx.http.client.HttpClientConfig;
|
||||
import com.metamx.http.client.HttpClientInit;
|
||||
import com.metamx.http.client.response.ToStringResponseHandler;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -55,34 +50,29 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
|
|||
{
|
||||
private static final Logger log = new Logger(DruidMasterSegmentMerger.class);
|
||||
|
||||
private final MergerClient mergerClient;
|
||||
private final IndexingServiceClient indexingServiceClient;
|
||||
private final AtomicReference<MergerWhitelist> whiteListRef;
|
||||
|
||||
public DruidMasterSegmentMerger(MergerClient mergerClient)
|
||||
public DruidMasterSegmentMerger(
|
||||
IndexingServiceClient indexingServiceClient,
|
||||
AtomicReference<MergerWhitelist> whitelistRef
|
||||
)
|
||||
{
|
||||
this.mergerClient = mergerClient;
|
||||
}
|
||||
|
||||
public DruidMasterSegmentMerger(ObjectMapper jsonMapper, ServiceProvider serviceProvider)
|
||||
{
|
||||
this.mergerClient = new HttpMergerClient(
|
||||
HttpClientInit.createClient(
|
||||
HttpClientConfig.builder().withNumConnections(1).build(),
|
||||
new Lifecycle()
|
||||
),
|
||||
new ToStringResponseHandler(Charsets.UTF_8),
|
||||
jsonMapper,
|
||||
serviceProvider
|
||||
);
|
||||
this.indexingServiceClient = indexingServiceClient;
|
||||
this.whiteListRef = whitelistRef;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
|
||||
{
|
||||
MergerWhitelist whitelist = whiteListRef.get();
|
||||
|
||||
MasterStats stats = new MasterStats();
|
||||
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = Maps.newHashMap();
|
||||
|
||||
// Find serviced segments by using a timeline
|
||||
for (DataSegment dataSegment : params.getAvailableSegments()) {
|
||||
if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) {
|
||||
VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(dataSegment.getDataSource());
|
||||
if (timeline == null) {
|
||||
timeline = new VersionedIntervalTimeline<String, DataSegment>(Ordering.<String>natural());
|
||||
|
@ -94,6 +84,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
|
|||
dataSegment.getShardSpec().createChunk(dataSegment)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Find segments to merge
|
||||
for (final Map.Entry<String, VersionedIntervalTimeline<String, DataSegment>> entry : dataSources.entrySet()) {
|
||||
|
@ -161,7 +152,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
|
|||
log.info("[%s] Found %d segments to merge %s", dataSource, segments.size(), segmentNames);
|
||||
|
||||
try {
|
||||
mergerClient.runRequest(dataSource, segments);
|
||||
indexingServiceClient.mergeSegments(segments);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.merge.ClientAppendQuery;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.response.HttpResponseHandler;
|
||||
import com.netflix.curator.x.discovery.ServiceProvider;
|
||||
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
public class HttpMergerClient implements MergerClient
|
||||
{
|
||||
private final HttpClient client;
|
||||
private final HttpResponseHandler<StringBuilder, String> responseHandler;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ServiceProvider serviceProvider;
|
||||
|
||||
public HttpMergerClient(
|
||||
HttpClient client,
|
||||
HttpResponseHandler<StringBuilder, String> responseHandler,
|
||||
ObjectMapper jsonMapper,
|
||||
ServiceProvider serviceProvider
|
||||
)
|
||||
{
|
||||
this.client = client;
|
||||
this.responseHandler = responseHandler;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
public void runRequest(String dataSource, List<DataSegment> segments)
|
||||
{
|
||||
try {
|
||||
byte[] dataToSend = jsonMapper.writeValueAsBytes(
|
||||
new ClientAppendQuery(dataSource, segments)
|
||||
);
|
||||
|
||||
client.post(
|
||||
new URL(
|
||||
String.format(
|
||||
"http://%s:%s/mmx/merger/v1/merge",
|
||||
serviceProvider.getInstance().getAddress(),
|
||||
serviceProvider.getInstance().getPort()
|
||||
)
|
||||
)
|
||||
)
|
||||
.setContent("application/json", dataToSend)
|
||||
.go(responseHandler)
|
||||
.get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,13 +19,36 @@
|
|||
|
||||
package com.metamx.druid.master;
|
||||
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonValue;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface MergerClient
|
||||
public class MergerWhitelist
|
||||
{
|
||||
public void runRequest(String dataSource, List<DataSegment> segments);
|
||||
public static final String CONFIG_KEY = "merger.whitelist";
|
||||
|
||||
private final Set<String> dataSources;
|
||||
|
||||
@JsonCreator
|
||||
public MergerWhitelist(Set<String> dataSources)
|
||||
{
|
||||
this.dataSources = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
this.dataSources.addAll(dataSources);
|
||||
}
|
||||
|
||||
@JsonValue
|
||||
public Set<String> getDataSources()
|
||||
{
|
||||
return dataSources;
|
||||
}
|
||||
|
||||
public boolean contains(String val)
|
||||
{
|
||||
return dataSources.contains(val);
|
||||
}
|
||||
}
|
|
@ -23,12 +23,14 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.client.indexing.IndexingServiceClient;
|
||||
import junit.framework.Assert;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class DruidMasterSegmentMergerTest
|
||||
{
|
||||
|
@ -367,11 +369,7 @@ public class DruidMasterSegmentMergerTest
|
|||
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build()
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(
|
||||
ImmutableList.of(segments.get(4), segments.get(5))
|
||||
), merge(segments)
|
||||
);
|
||||
Assert.assertEquals(ImmutableList.of(ImmutableList.of(segments.get(4), segments.get(5))), merge(segments));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -380,16 +378,17 @@ public class DruidMasterSegmentMergerTest
|
|||
private static List<List<DataSegment>> merge(final Collection<DataSegment> segments)
|
||||
{
|
||||
final List<List<DataSegment>> retVal = Lists.newArrayList();
|
||||
final MergerClient mergerClient = new MergerClient()
|
||||
final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null)
|
||||
{
|
||||
@Override
|
||||
public void runRequest(String dataSource, List<DataSegment> segmentsToMerge)
|
||||
public void mergeSegments(List<DataSegment> segmentsToMerge)
|
||||
{
|
||||
retVal.add(segmentsToMerge);
|
||||
}
|
||||
};
|
||||
|
||||
final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(mergerClient);
|
||||
final AtomicReference<MergerWhitelist> whitelistRef = new AtomicReference<MergerWhitelist>(null);
|
||||
final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef);
|
||||
final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder()
|
||||
.withAvailableSegments(ImmutableSet.copyOf(segments))
|
||||
.withMergeBytesLimit(mergeBytesLimit)
|
||||
|
|
|
@ -152,8 +152,6 @@ public class DruidMasterTest
|
|||
new NoopServiceEmitter(),
|
||||
scheduledExecutorFactory,
|
||||
loadManagementPeons,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue