From 1c3ef48f34b7454d6bdd8bba55e079b92e7fc352 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Thu, 7 Mar 2013 12:59:59 -0600 Subject: [PATCH] 1) Adjust the Config stuff that WorkerSetupManager was using to be reusable 2) Use new ConfigManager to pull out a whitelist for the Master --- .../indexing}/ClientAppendQuery.java | 12 +- .../indexing/ClientConversionQuery.java | 57 +++ .../indexing/ClientKillQuery.java} | 27 +- .../indexing/ClientMergeQuery.java} | 16 +- .../indexing/IndexingServiceClient.java | 115 ++++++ .../metamx/druid/merge/ClientKillQuery.java | 41 --- .../metamx/druid/merge/ClientMergeQuery.java | 41 --- .../metamx/druid/config/ConfigManager.java | 259 ++++++++++++++ .../druid/config/ConfigManagerConfig.java | 18 + .../com/metamx/druid/config/ConfigSerde.java | 9 + .../druid/config/JacksonConfigManager.java | 134 +++++++ .../java/com/metamx/druid/db/DbConnector.java | 9 +- .../druid/merger/common/task/AppendTask.java | 2 +- .../merger/common/task/DefaultMergeTask.java | 87 ----- .../druid/merger/common/task/MergeTask.java | 326 +++--------------- .../merger/common/task/MergeTaskBase.java | 315 +++++++++++++++++ .../metamx/druid/merger/common/task/Task.java | 6 +- .../common/task/VersionConverterSubTask.java | 94 ----- .../common/task/VersionConverterTask.java | 88 ++++- .../config/WorkerSetupManagerConfig.java | 39 --- .../http/IndexerCoordinatorNode.java | 20 +- .../http/IndexerCoordinatorResource.java | 19 +- .../coordinator/scaling/ScalingStats.java | 19 + .../coordinator/setup/WorkerSetupManager.java | 184 +--------- .../com/metamx/druid/merger/TestTask.java | 4 +- ...geTaskTest.java => MergeTaskBaseTest.java} | 10 +- pom.xml | 2 +- .../com/metamx/druid/http/InfoResource.java | 9 +- .../com/metamx/druid/http/MasterMain.java | 24 +- .../com/metamx/druid/http/MasterResource.java | 1 - .../druid/http/MasterServletModule.java | 7 +- .../com/metamx/druid/master/DruidMaster.java | 110 +++--- .../master/DruidMasterSegmentMerger.java | 57 ++- .../metamx/druid/master/HttpMergerClient.java | 78 ----- ...MergerClient.java => MergerWhitelist.java} | 29 +- .../master/DruidMasterSegmentMergerTest.java | 15 +- .../metamx/druid/master/DruidMasterTest.java | 2 - 37 files changed, 1271 insertions(+), 1014 deletions(-) rename client/src/main/java/com/metamx/druid/{merge => client/indexing}/ClientAppendQuery.java (92%) create mode 100644 client/src/main/java/com/metamx/druid/client/indexing/ClientConversionQuery.java rename client/src/main/java/com/metamx/druid/{merge/ClientDeleteQuery.java => client/indexing/ClientKillQuery.java} (83%) rename client/src/main/java/com/metamx/druid/{merge/ClientDefaultMergeQuery.java => client/indexing/ClientMergeQuery.java} (90%) create mode 100644 client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java delete mode 100644 client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java delete mode 100644 client/src/main/java/com/metamx/druid/merge/ClientMergeQuery.java create mode 100644 common/src/main/java/com/metamx/druid/config/ConfigManager.java create mode 100644 common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java create mode 100644 common/src/main/java/com/metamx/druid/config/ConfigSerde.java create mode 100644 common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java delete mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/DefaultMergeTask.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java delete mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java delete mode 100644 merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java rename merger/src/test/java/com/metamx/druid/merger/common/task/{MergeTaskTest.java => MergeTaskBaseTest.java} (89%) delete mode 100644 server/src/main/java/com/metamx/druid/master/HttpMergerClient.java rename server/src/main/java/com/metamx/druid/master/{MergerClient.java => MergerWhitelist.java} (57%) diff --git a/client/src/main/java/com/metamx/druid/merge/ClientAppendQuery.java b/client/src/main/java/com/metamx/druid/client/indexing/ClientAppendQuery.java similarity index 92% rename from client/src/main/java/com/metamx/druid/merge/ClientAppendQuery.java rename to client/src/main/java/com/metamx/druid/client/indexing/ClientAppendQuery.java index 5a8e3bdb50f..5f744918a0a 100644 --- a/client/src/main/java/com/metamx/druid/merge/ClientAppendQuery.java +++ b/client/src/main/java/com/metamx/druid/client/indexing/ClientAppendQuery.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.merge; +package com.metamx.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -29,7 +29,7 @@ import java.util.List; /** */ -public class ClientAppendQuery implements ClientMergeQuery +public class ClientAppendQuery { private final String dataSource; private final List segments; @@ -45,14 +45,18 @@ public class ClientAppendQuery implements ClientMergeQuery } @JsonProperty - @Override + public String getType() + { + return "append"; + } + + @JsonProperty public String getDataSource() { return dataSource; } @JsonProperty - @Override public List getSegments() { return segments; diff --git a/client/src/main/java/com/metamx/druid/client/indexing/ClientConversionQuery.java b/client/src/main/java/com/metamx/druid/client/indexing/ClientConversionQuery.java new file mode 100644 index 00000000000..c0b96bb80b4 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/indexing/ClientConversionQuery.java @@ -0,0 +1,57 @@ +package com.metamx.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.client.DataSegment; +import org.joda.time.Interval; + +/** + */ +public class ClientConversionQuery +{ + private final String dataSource; + private final Interval interval; + private final DataSegment segment; + + public ClientConversionQuery( + DataSegment segment + ) + { + this.dataSource = segment.getDataSource(); + this.interval = segment.getInterval(); + this.segment = segment; + } + + public ClientConversionQuery( + String dataSource, + Interval interval + ) + { + this.dataSource = dataSource; + this.interval = interval; + this.segment = null; + } + + @JsonProperty + public String getType() + { + return "version_converter"; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public DataSegment getSegment() + { + return segment; + } +} diff --git a/client/src/main/java/com/metamx/druid/merge/ClientDeleteQuery.java b/client/src/main/java/com/metamx/druid/client/indexing/ClientKillQuery.java similarity index 83% rename from client/src/main/java/com/metamx/druid/merge/ClientDeleteQuery.java rename to client/src/main/java/com/metamx/druid/client/indexing/ClientKillQuery.java index 3acf20a0a43..3ae8dffb225 100644 --- a/client/src/main/java/com/metamx/druid/merge/ClientDeleteQuery.java +++ b/client/src/main/java/com/metamx/druid/client/indexing/ClientKillQuery.java @@ -17,21 +17,21 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.merge; - - +package com.metamx.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.Interval; -public class ClientDeleteQuery +/** + */ +public class ClientKillQuery { private final String dataSource; private final Interval interval; @JsonCreator - public ClientDeleteQuery( + public ClientKillQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) @@ -40,22 +40,21 @@ public class ClientDeleteQuery this.interval = interval; } + @JsonProperty + public String getType() + { + return "kill"; + } + + @JsonProperty public String getDataSource() { return dataSource; } + @JsonProperty public Interval getInterval() { return interval; } - - @Override - public String toString() - { - return "ClientDeleteQuery{" + - "dataSource='" + dataSource + '\'' + - ", interval=" + interval + - '}'; - } } diff --git a/client/src/main/java/com/metamx/druid/merge/ClientDefaultMergeQuery.java b/client/src/main/java/com/metamx/druid/client/indexing/ClientMergeQuery.java similarity index 90% rename from client/src/main/java/com/metamx/druid/merge/ClientDefaultMergeQuery.java rename to client/src/main/java/com/metamx/druid/client/indexing/ClientMergeQuery.java index 4286cd211cb..e000826ff9a 100644 --- a/client/src/main/java/com/metamx/druid/merge/ClientDefaultMergeQuery.java +++ b/client/src/main/java/com/metamx/druid/client/indexing/ClientMergeQuery.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.merge; +package com.metamx.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -30,14 +30,14 @@ import java.util.List; /** */ -public class ClientDefaultMergeQuery implements ClientMergeQuery +public class ClientMergeQuery { private final String dataSource; private final List segments; private final List aggregators; @JsonCreator - public ClientDefaultMergeQuery( + public ClientMergeQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators @@ -50,14 +50,18 @@ public class ClientDefaultMergeQuery implements ClientMergeQuery } @JsonProperty - @Override + public String getType() + { + return "append"; + } + + @JsonProperty public String getDataSource() { return dataSource; } @JsonProperty - @Override public List getSegments() { return segments; @@ -72,7 +76,7 @@ public class ClientDefaultMergeQuery implements ClientMergeQuery @Override public String toString() { - return "ClientDefaultMergeQuery{" + + return "ClientMergeQuery{" + "dataSource='" + dataSource + '\'' + ", segments=" + segments + ", aggregators=" + aggregators + diff --git a/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java b/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java new file mode 100644 index 00000000000..b659148d338 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/indexing/IndexingServiceClient.java @@ -0,0 +1,115 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.IAE; +import com.metamx.druid.client.DataSegment; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.InputStreamResponseHandler; +import com.netflix.curator.x.discovery.ServiceInstance; +import com.netflix.curator.x.discovery.ServiceProvider; +import org.joda.time.Interval; + + +import java.io.InputStream; +import java.net.URL; +import java.util.Iterator; +import java.util.List; + +public class IndexingServiceClient +{ + private static final InputStreamResponseHandler RESPONSE_HANDLER = new InputStreamResponseHandler(); + + private final HttpClient client; + private final ObjectMapper jsonMapper; + private final ServiceProvider serviceProvider; + + public IndexingServiceClient( + HttpClient client, + ObjectMapper jsonMapper, + ServiceProvider serviceProvider + ) + { + this.client = client; + this.jsonMapper = jsonMapper; + this.serviceProvider = serviceProvider; + } + + public void mergeSegments(List segments) + { + final Iterator segmentsIter = segments.iterator(); + if (!segmentsIter.hasNext()) { + return; + } + + final String dataSource = segmentsIter.next().getDataSource(); + while (segmentsIter.hasNext()) { + DataSegment next = segmentsIter.next(); + if (!dataSource.equals(next.getDataSource())) { + throw new IAE("Cannot merge segments of different dataSources[%s] and [%s]", dataSource, next.getDataSource()); + } + } + + runQuery("merge", new ClientAppendQuery(dataSource, segments)); + } + + public void killSegments(String dataSource, Interval interval) + { + runQuery("index", new ClientKillQuery(dataSource, interval)); + } + + public void upgradeSegment(DataSegment dataSegment) + { + runQuery("task", new ClientConversionQuery(dataSegment)); + } + + public void upgradeSegments(String dataSource, Interval interval) + { + runQuery("task", new ClientConversionQuery(dataSource, interval)); + } + + private InputStream runQuery(String endpoint, Object queryObject) + { + try { + return client.post(new URL(String.format("%s/%s", baseUrl(), endpoint))) + .setContent("application/json", jsonMapper.writeValueAsBytes(queryObject)) + .go(RESPONSE_HANDLER) + .get(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + + private String baseUrl() + { + try { + final ServiceInstance instance = serviceProvider.getInstance(); + return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java deleted file mode 100644 index 02e0a7bd141..00000000000 --- a/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.metamx.druid.merge; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.Interval; - -/** - */ -public class ClientKillQuery -{ - private final String dataSource; - private final Interval interval; - - @JsonCreator - public ClientKillQuery( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval - ) - { - this.dataSource = dataSource; - this.interval = interval; - } - - @JsonProperty - public String getType() - { - return "kill"; - } - - @JsonProperty - public String getDataSource() - { - return dataSource; - } - - @JsonProperty - public Interval getInterval() - { - return interval; - } -} diff --git a/client/src/main/java/com/metamx/druid/merge/ClientMergeQuery.java b/client/src/main/java/com/metamx/druid/merge/ClientMergeQuery.java deleted file mode 100644 index 3956991ed6b..00000000000 --- a/client/src/main/java/com/metamx/druid/merge/ClientMergeQuery.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merge; - -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.metamx.druid.client.DataSegment; - - - -import java.util.List; - -/** - */ -@JsonTypeInfo(use= JsonTypeInfo.Id.NAME, property="type", defaultImpl = ClientDefaultMergeQuery.class) -@JsonSubTypes(value={ - @JsonSubTypes.Type(name="append", value=ClientAppendQuery.class) -}) -public interface ClientMergeQuery -{ - public String getDataSource(); - - public List getSegments(); -} diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManager.java b/common/src/main/java/com/metamx/druid/config/ConfigManager.java new file mode 100644 index 00000000000..3073c13e9fb --- /dev/null +++ b/common/src/main/java/com/metamx/druid/config/ConfigManager.java @@ -0,0 +1,259 @@ +package com.metamx.druid.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import org.joda.time.Duration; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.IDBI; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class ConfigManager +{ + private static final Logger log = new Logger(ConfigManager.class); + + private final Object lock = new Object(); + private boolean started = false; + + private final IDBI dbi; + private final ConfigManagerConfig config; + + private final ScheduledExecutorService exec; + private final ConcurrentMap watchedConfigs; + private final String selectStatement; + + public ConfigManager(IDBI dbi, ConfigManagerConfig config) + { + this.dbi = dbi; + this.config = config; + + this.exec = ScheduledExecutors.fixed(1, "config-manager-%s"); + this.watchedConfigs = Maps.newConcurrentMap(); + this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable()); + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + new Duration(0), + config.getPollDuration(), + new Runnable() + { + @Override + public void run() + { + poll(); + } + } + ); + + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + started = false; + } + } + + private void poll() + { + for (Map.Entry entry : watchedConfigs.entrySet()) { + try { + if (entry.getValue().swapIfNew(lookup(entry.getKey()))) { + log.info("New value for key[%s] seen.", entry.getKey()); + } + } + catch (Exception e) { + log.warn(e, "Exception when checking property[%s]", entry.getKey()); + } + } + } + + @SuppressWarnings("unchecked") + public AtomicReference watchConfig(final String key, final ConfigSerde serde) + { + ConfigHolder holder = watchedConfigs.get(key); + if (holder == null) { + try { + log.info("Creating watch for key[%s]", key); + + holder = exec.submit( + new Callable>() + { + @Override + @SuppressWarnings("unchecked") + public ConfigHolder call() throws Exception + { + if (!started) { + watchedConfigs.put(key, new ConfigHolder(null, serde)); + } + else { + try { + // Multiple of these callables can be submitted at the same time, but the callables themselves + // are executed serially, so double check that it hasn't already been populated. + if (!watchedConfigs.containsKey(key)) { + byte[] value = lookup(key); + ConfigHolder holder = new ConfigHolder(value, serde); + watchedConfigs.put(key, holder); + } + } + catch (Exception e) { + log.warn(e, "Failed loading config for key[%s]", key); + watchedConfigs.put(key, new ConfigHolder(null, serde)); + } + } + + return watchedConfigs.get(key); + } + } + ).get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + catch (ExecutionException e) { + throw Throwables.propagate(e); + } + } + + return holder.getReference(); + } + + public byte[] lookup(final String key) + { + return dbi.withHandle( + new HandleCallback() + { + @Override + public byte[] withHandle(Handle handle) throws Exception + { + return handle.createQuery(selectStatement) + .bind("name", key) + .map( + new ResultSetMapper() + { + @Override + public byte[] map(int index, ResultSet r, StatementContext ctx) throws SQLException + { + return r.getBytes("payload"); + } + } + ) + .first(); + } + } + ); + } + + public boolean set(final String key, final ConfigSerde serde, final T obj) + { + if (obj == null) { + return false; + } + + final byte[] newBytes = serde.serialize(obj); + + try { + return exec.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement("INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload") + .bind("name", key) + .bind("payload", newBytes) + .execute(); + return null; + } + } + ); + + final ConfigHolder configHolder = watchedConfigs.get(key); + if (configHolder != null) { + configHolder.swapIfNew(newBytes); + } + + return true; + } + } + ).get(); + } + catch (Exception e) { + log.warn(e, "Failed to set[%s]", key); + return false; + } + } + + private static class ConfigHolder + { + private final AtomicReference rawBytes; + private final ConfigSerde serde; + private final AtomicReference reference; + + ConfigHolder( + byte[] rawBytes, + ConfigSerde serde + ) + { + this.rawBytes = new AtomicReference(rawBytes); + this.serde = serde; + this.reference = new AtomicReference(serde.deserialize(rawBytes)); + } + + public AtomicReference getReference() + { + return reference; + } + + public boolean swapIfNew(byte[] newBytes) + { + if (!Arrays.equals(newBytes, rawBytes.get())) { + reference.set(serde.deserialize(newBytes)); + rawBytes.set(newBytes); + return true; + } + return false; + } + } +} diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java b/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java new file mode 100644 index 00000000000..24706a83fb7 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java @@ -0,0 +1,18 @@ +package com.metamx.druid.config; + +import org.joda.time.Duration; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + */ +public abstract class ConfigManagerConfig +{ + @Config("druid.indexer.configTable") + public abstract String getConfigTable(); + + @Config("druid.indexer.poll.duration") + @Default("PT1M") + public abstract Duration getPollDuration(); + +} diff --git a/common/src/main/java/com/metamx/druid/config/ConfigSerde.java b/common/src/main/java/com/metamx/druid/config/ConfigSerde.java new file mode 100644 index 00000000000..95f0a1ee7d3 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/config/ConfigSerde.java @@ -0,0 +1,9 @@ +package com.metamx.druid.config; + +/** +*/ +public interface ConfigSerde +{ + public byte[] serialize(T obj); + public T deserialize(byte[] bytes); +} diff --git a/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java b/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java new file mode 100644 index 00000000000..8e322f3ee80 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/config/JacksonConfigManager.java @@ -0,0 +1,134 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class JacksonConfigManager +{ + private final ConfigManager configManager; + private final ObjectMapper jsonMapper; + + public JacksonConfigManager( + ConfigManager configManager, + ObjectMapper jsonMapper + ) + { + this.configManager = configManager; + this.jsonMapper = jsonMapper; + } + + public AtomicReference watch(String key, Class clazz) + { + return watch(key, clazz, null); + } + + public AtomicReference watch(String key, Class clazz, T defaultVal) + { + return configManager.watchConfig(key, create(clazz, defaultVal)); + } + + public AtomicReference watch(String key, TypeReference clazz) + { + return watch(key, clazz, null); + } + + public AtomicReference watch(String key, TypeReference clazz, T defaultVal) + { + return configManager.watchConfig(key, create(clazz, defaultVal)); + } + + public boolean set(String key, T val) + { + return configManager.set(key, create(val.getClass(), null), val); + } + + private ConfigSerde create(final Class clazz, final T defaultVal) + { + return new ConfigSerde() + { + @Override + public byte[] serialize(T obj) + { + try { + return jsonMapper.writeValueAsBytes(obj); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + } + + @Override + public T deserialize(byte[] bytes) + { + if (bytes == null) { + return defaultVal; + } + + try { + return jsonMapper.readValue(bytes, clazz); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + }; + } + + private ConfigSerde create(final TypeReference clazz, final T defaultVal) + { + return new ConfigSerde() + { + @Override + public byte[] serialize(T obj) + { + try { + return jsonMapper.writeValueAsBytes(obj); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + } + + @Override + public T deserialize(byte[] bytes) + { + if (bytes == null) { + return defaultVal; + } + + try { + return jsonMapper.readValue(bytes, clazz); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + }; + } +} diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index 73013ce6aa2..b8ab7a4747e 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -65,7 +65,7 @@ public class DbConnector dbi, configTableName, String.format( - "CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))", + "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", configTableName ) ); @@ -84,12 +84,7 @@ public class DbConnector @Override public Void withHandle(Handle handle) throws Exception { - List> table = handle.select( - String.format( - "SHOW tables LIKE '%s'", - tableName - ) - ); + List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); if (table.isEmpty()) { log.info("Creating table[%s]", tableName); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java index 48ccdda13de..5d15269677a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java @@ -44,7 +44,7 @@ import java.util.Map; /** */ -public class AppendTask extends MergeTask +public class AppendTask extends MergeTaskBase { @JsonCreator public AppendTask( diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DefaultMergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DefaultMergeTask.java deleted file mode 100644 index a46c24e91f3..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DefaultMergeTask.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.common.task; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Function; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.metamx.druid.aggregation.AggregatorFactory; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.index.QueryableIndex; -import com.metamx.druid.index.v1.IndexIO; -import com.metamx.druid.index.v1.IndexMerger; - -import javax.annotation.Nullable; -import java.io.File; -import java.util.List; -import java.util.Map; - -/** - */ -public class DefaultMergeTask extends MergeTask -{ - private final List aggregators; - - @JsonCreator - public DefaultMergeTask( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("segments") List segments, - @JsonProperty("aggregations") List aggregators - ) - { - super(dataSource, segments); - this.aggregators = aggregators; - } - - @Override - public File merge(final Map segments, final File outDir) - throws Exception - { - return IndexMerger.mergeQueryableIndex( - Lists.transform( - ImmutableList.copyOf(segments.values()), - new Function() - { - @Override - public QueryableIndex apply(@Nullable File input) - { - try { - return IndexIO.loadIndex(input); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ), - aggregators.toArray(new AggregatorFactory[aggregators.size()]), - outDir - ); - } - - @Override - public String getType() - { - return "merge"; - } -} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index 561cb940639..4e6102f666b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -19,301 +19,71 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.common.base.Charsets; import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; -import com.google.common.hash.Hashing; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; +import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.merger.common.TaskLock; -import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.TaskToolbox; -import com.metamx.druid.merger.common.actions.LockListAction; -import com.metamx.druid.merger.common.actions.SegmentInsertAction; -import com.metamx.druid.shard.NoneShardSpec; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.service.AlertEvent; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.emitter.service.ServiceMetricEvent; -import org.joda.time.DateTime; -import org.joda.time.Interval; +import com.metamx.druid.index.QueryableIndex; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.index.v1.IndexMerger; + + import javax.annotation.Nullable; import java.io.File; import java.util.List; import java.util.Map; -import java.util.Set; /** */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultMergeTask.class) -@JsonSubTypes(value = { - @JsonSubTypes.Type(name = "append", value = AppendTask.class) -}) -public abstract class MergeTask extends AbstractTask +public class MergeTask extends MergeTaskBase { - private final List segments; + private final List aggregators; - private static final EmittingLogger log = new EmittingLogger(MergeTask.class); - - protected MergeTask(final String dataSource, final List segments) - { - super( - // _not_ the version, just something uniqueish - String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()), - dataSource, - computeMergedInterval(segments) - ); - - // Verify segment list is nonempty - Preconditions.checkArgument(segments.size() > 0, "segments nonempty"); - // Verify segments are all in the correct datasource - Preconditions.checkArgument( - Iterables.size( - Iterables.filter( - segments, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment segment) - { - return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource); - } - } - ) - ) == 0, "segments in the wrong datasource" - ); - // Verify segments are all unsharded - Preconditions.checkArgument( - Iterables.size( - Iterables.filter( - segments, - new Predicate() - { - @Override - public boolean apply(@Nullable DataSegment segment) - { - return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec); - } - } - ) - ) == 0, "segments without NoneShardSpec" - ); - - this.segments = segments; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); - final ServiceEmitter emitter = toolbox.getEmitter(); - final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); - final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); - final File taskDir = toolbox.getTaskDir(); - - try { - - final long startTime = System.currentTimeMillis(); - - log.info( - "Starting merge of id[%s], segments: %s", - getId(), - Lists.transform( - segments, - new Function() - { - @Override - public String apply(@Nullable DataSegment input) - { - return input.getIdentifier(); - } - } - ) - ); - - - // download segments to merge - final Map gettedSegments = toolbox.getSegments(segments); - - // merge files together - final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged")); - - emitter.emit(builder.build("merger/numMerged", segments.size())); - emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime)); - - log.info( - "[%s] : Merged %d segments in %,d millis", - mergedSegment.getDataSource(), - segments.size(), - System.currentTimeMillis() - startTime - ); - - long uploadStart = System.currentTimeMillis(); - - // Upload file - final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment); - - emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); - emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); - - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); - - return TaskStatus.success(getId()); - } - catch (Exception e) { - log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource()) - .addData("interval", mergedSegment.getInterval()) - .emit(); - - return TaskStatus.failure(getId()); - } - } - - /** - * Checks pre-existing segments in "context" to confirm that this merge query is valid. Specifically, confirm that - * we are operating on every segment that overlaps the chosen interval. - */ - @Override - public TaskStatus preflight(TaskToolbox toolbox) - { - final Function toIdentifier = new Function() - { - @Override - public String apply(DataSegment dataSegment) - { - return dataSegment.getIdentifier(); - } - }; - - final Set current = ImmutableSet.copyOf( - Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier) - ); - final Set requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier)); - - final Set missingFromRequested = Sets.difference(current, requested); - if (!missingFromRequested.isEmpty()) { - throw new ISE( - "Merge is invalid: current segment(s) are not in the requested set: %s", - Joiner.on(", ").join(missingFromRequested) - ); - } - - final Set missingFromCurrent = Sets.difference(requested, current); - if (!missingFromCurrent.isEmpty()) { - throw new ISE( - "Merge is invalid: requested segment(s) are not in the current set: %s", - Joiner.on(", ").join(missingFromCurrent) - ); - } - - return TaskStatus.running(getId()); - - } - - protected abstract File merge(Map segments, File outDir) - throws Exception; - - @JsonProperty - public List getSegments() - { - return segments; - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("id", getId()) - .add("dataSource", getDataSource()) - .add("interval", getImplicitLockInterval()) - .add("segments", segments) - .toString(); - } - - private static String computeProcessingID(final String dataSource, final List segments) - { - final String segmentIDs = Joiner.on("_").join( - Iterables.transform( - Ordering.natural().sortedCopy(segments), new Function() - { - @Override - public String apply(DataSegment x) - { - return String.format( - "%s_%s_%s_%s", - x.getInterval().getStart(), - x.getInterval().getEnd(), - x.getVersion(), - x.getShardSpec().getPartitionNum() - ); - } - } - ) - ); - - return String.format( - "%s_%s", - dataSource, - Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase() - ); - } - - private static Interval computeMergedInterval(final List segments) - { - Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0"); - - DateTime start = null; - DateTime end = null; - - for(final DataSegment segment : segments) { - if(start == null || segment.getInterval().getStart().isBefore(start)) { - start = segment.getInterval().getStart(); - } - - if(end == null || segment.getInterval().getEnd().isAfter(end)) { - end = segment.getInterval().getEnd(); - } - } - - return new Interval(start, end); - } - - private static DataSegment computeMergedSegment( - final String dataSource, - final String version, - final List segments + @JsonCreator + public MergeTask( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("segments") List segments, + @JsonProperty("aggregations") List aggregators ) { - final Interval mergedInterval = computeMergedInterval(segments); - final Set mergedDimensions = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); - final Set mergedMetrics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + super(dataSource, segments); + this.aggregators = aggregators; + } - for (DataSegment segment : segments) { - mergedDimensions.addAll(segment.getDimensions()); - mergedMetrics.addAll(segment.getMetrics()); - } + @Override + public File merge(final Map segments, final File outDir) + throws Exception + { + return IndexMerger.mergeQueryableIndex( + Lists.transform( + ImmutableList.copyOf(segments.values()), + new Function() + { + @Override + public QueryableIndex apply(@Nullable File input) + { + try { + return IndexIO.loadIndex(input); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ), + aggregators.toArray(new AggregatorFactory[aggregators.size()]), + outDir + ); + } - return DataSegment.builder() - .dataSource(dataSource) - .interval(mergedInterval) - .version(version) - .shardSpec(new NoneShardSpec()) - .dimensions(Lists.newArrayList(mergedDimensions)) - .metrics(Lists.newArrayList(mergedMetrics)) - .build(); + @Override + public String getType() + { + return "merge"; } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java new file mode 100644 index 00000000000..e0b3dd6ff17 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java @@ -0,0 +1,315 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.task; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskLock; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.actions.LockListAction; +import com.metamx.druid.merger.common.actions.SegmentInsertAction; +import com.metamx.druid.shard.NoneShardSpec; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.AlertEvent; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +public abstract class MergeTaskBase extends AbstractTask +{ + private final List segments; + + private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class); + + protected MergeTaskBase(final String dataSource, final List segments) + { + super( + // _not_ the version, just something uniqueish + String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()), + dataSource, + computeMergedInterval(segments) + ); + + // Verify segment list is nonempty + Preconditions.checkArgument(segments.size() > 0, "segments nonempty"); + // Verify segments are all in the correct datasource + Preconditions.checkArgument( + Iterables.size( + Iterables.filter( + segments, + new Predicate() + { + @Override + public boolean apply(@Nullable DataSegment segment) + { + return segment == null || !segment.getDataSource().equalsIgnoreCase(dataSource); + } + } + ) + ) == 0, "segments in the wrong datasource" + ); + // Verify segments are all unsharded + Preconditions.checkArgument( + Iterables.size( + Iterables.filter( + segments, + new Predicate() + { + @Override + public boolean apply(@Nullable DataSegment segment) + { + return segment == null || !(segment.getShardSpec() instanceof NoneShardSpec); + } + } + ) + ) == 0, "segments without NoneShardSpec" + ); + + this.segments = segments; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction())); + final ServiceEmitter emitter = toolbox.getEmitter(); + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments); + final File taskDir = toolbox.getTaskDir(); + + try { + + final long startTime = System.currentTimeMillis(); + + log.info( + "Starting merge of id[%s], segments: %s", + getId(), + Lists.transform( + segments, + new Function() + { + @Override + public String apply(@Nullable DataSegment input) + { + return input.getIdentifier(); + } + } + ) + ); + + + // download segments to merge + final Map gettedSegments = toolbox.getSegments(segments); + + // merge files together + final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged")); + + emitter.emit(builder.build("merger/numMerged", segments.size())); + emitter.emit(builder.build("merger/mergeTime", System.currentTimeMillis() - startTime)); + + log.info( + "[%s] : Merged %d segments in %,d millis", + mergedSegment.getDataSource(), + segments.size(), + System.currentTimeMillis() - startTime + ); + + long uploadStart = System.currentTimeMillis(); + + // Upload file + final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment); + + emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); + emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); + + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment))); + + return TaskStatus.success(getId()); + } + catch (Exception e) { + log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource()) + .addData("interval", mergedSegment.getInterval()) + .emit(); + + return TaskStatus.failure(getId()); + } + } + + /** + * Checks pre-existing segments in "context" to confirm that this merge query is valid. Specifically, confirm that + * we are operating on every segment that overlaps the chosen interval. + */ + @Override + public TaskStatus preflight(TaskToolbox toolbox) + { + final Function toIdentifier = new Function() + { + @Override + public String apply(DataSegment dataSegment) + { + return dataSegment.getIdentifier(); + } + }; + + final Set current = ImmutableSet.copyOf( + Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier) + ); + final Set requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier)); + + final Set missingFromRequested = Sets.difference(current, requested); + if (!missingFromRequested.isEmpty()) { + throw new ISE( + "Merge is invalid: current segment(s) are not in the requested set: %s", + Joiner.on(", ").join(missingFromRequested) + ); + } + + final Set missingFromCurrent = Sets.difference(requested, current); + if (!missingFromCurrent.isEmpty()) { + throw new ISE( + "Merge is invalid: requested segment(s) are not in the current set: %s", + Joiner.on(", ").join(missingFromCurrent) + ); + } + + return TaskStatus.running(getId()); + + } + + protected abstract File merge(Map segments, File outDir) + throws Exception; + + @JsonProperty + public List getSegments() + { + return segments; + } + + @Override + public String toString() + { + return Objects.toStringHelper(this) + .add("id", getId()) + .add("dataSource", getDataSource()) + .add("interval", getImplicitLockInterval()) + .add("segments", segments) + .toString(); + } + + private static String computeProcessingID(final String dataSource, final List segments) + { + final String segmentIDs = Joiner.on("_").join( + Iterables.transform( + Ordering.natural().sortedCopy(segments), new Function() + { + @Override + public String apply(DataSegment x) + { + return String.format( + "%s_%s_%s_%s", + x.getInterval().getStart(), + x.getInterval().getEnd(), + x.getVersion(), + x.getShardSpec().getPartitionNum() + ); + } + } + ) + ); + + return String.format( + "%s_%s", + dataSource, + Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase() + ); + } + + private static Interval computeMergedInterval(final List segments) + { + Preconditions.checkArgument(segments.size() > 0, "segments.size() > 0"); + + DateTime start = null; + DateTime end = null; + + for(final DataSegment segment : segments) { + if(start == null || segment.getInterval().getStart().isBefore(start)) { + start = segment.getInterval().getStart(); + } + + if(end == null || segment.getInterval().getEnd().isAfter(end)) { + end = segment.getInterval().getEnd(); + } + } + + return new Interval(start, end); + } + + private static DataSegment computeMergedSegment( + final String dataSource, + final String version, + final List segments + ) + { + final Interval mergedInterval = computeMergedInterval(segments); + final Set mergedDimensions = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + final Set mergedMetrics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + + for (DataSegment segment : segments) { + mergedDimensions.addAll(segment.getDimensions()); + mergedMetrics.addAll(segment.getMetrics()); + } + + return DataSegment.builder() + .dataSource(dataSource) + .interval(mergedInterval) + .version(version) + .shardSpec(new NoneShardSpec()) + .dimensions(Lists.newArrayList(mergedDimensions)) + .metrics(Lists.newArrayList(mergedMetrics)) + .build(); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index 8418ecf40a8..5f288be99dc 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -41,10 +41,10 @@ import org.joda.time.Interval; * to release locks early if they desire. * */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultMergeTask.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = "append", value = AppendTask.class), - @JsonSubTypes.Type(name = "merge", value = DefaultMergeTask.class), + @JsonSubTypes.Type(name = "merge", value = MergeTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @@ -52,7 +52,7 @@ import org.joda.time.Interval; @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class), @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), @JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class), - @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class) + @JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterTask.SubTask.class) }) public interface Task { diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java deleted file mode 100644 index 2099d903d33..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterSubTask.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.common.task; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Sets; -import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.index.v1.IndexIO; -import com.metamx.druid.merger.common.TaskStatus; -import com.metamx.druid.merger.common.TaskToolbox; -import com.metamx.druid.merger.common.actions.SegmentInsertAction; -import org.joda.time.DateTime; - -import java.io.File; -import java.util.Arrays; -import java.util.Map; - -/** - */ -public class VersionConverterSubTask extends AbstractTask -{ - private static final Logger log = new Logger(VersionConverterSubTask.class); - - private final DataSegment segment; - - protected VersionConverterSubTask( - @JsonProperty("groupId") String groupId, - @JsonProperty("segment") DataSegment segment - ) - { - super( - joinId( - groupId, - "sub", - segment.getInterval().getStart(), - segment.getInterval().getEnd(), - segment.getShardSpec().getPartitionNum() - ), - groupId, - segment.getDataSource(), - segment.getInterval() - ); - this.segment = segment; - } - - @Override - public String getType() - { - return "version_converter_sub"; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - log.info("Converting segment[%s]", segment); - final Map localSegments = toolbox.getSegments(Arrays.asList(segment)); - - final File location = localSegments.get(segment); - final File outLocation = new File(location, "v9_out"); - if (IndexIO.convertSegment(location, outLocation)) { - final int outVersion = IndexIO.getVersionFromDir(outLocation); - - // Appending to the version makes a new version that inherits most comparability parameters of the original - // version, but is "newer" than said original version. - DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion)); - updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); - - toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); - } - else { - log.info("Conversion failed."); - } - - return success(); - } -} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index 3859190a1bb..c8c0e2cbf42 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -22,19 +22,26 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.TaskActionClient; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Map; /** */ @@ -44,10 +51,12 @@ public class VersionConverterTask extends AbstractTask private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID); private static final Logger log = new Logger(VersionConverterTask.class); + private final DataSegment segment; public VersionConverterTask( @JsonProperty("dataSource") String dataSource, - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("segment") DataSegment segment ) { super( @@ -55,6 +64,8 @@ public class VersionConverterTask extends AbstractTask dataSource, interval ); + + this.segment = segment; } @Override @@ -66,12 +77,22 @@ public class VersionConverterTask extends AbstractTask @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - throw new ISE("Should never run, %s just exists to create subtasks", this.getClass().getSimpleName()); + if (segment == null) { + throw new ISE("Segment was null, this should never run.", this.getClass().getSimpleName()); + } + + log.info("I'm in a subless mood."); + convertSegment(toolbox, segment); + return success(); } @Override public TaskStatus preflight(TaskToolbox toolbox) throws Exception { + if (segment != null) { + return super.preflight(toolbox); + } + final TaskActionClient taskClient = toolbox.getTaskActionClientFactory(); List segments = taskClient.submit(defaultListUsedAction()); @@ -86,7 +107,7 @@ public class VersionConverterTask extends AbstractTask { final Integer segmentVersion = segment.getBinaryVersion(); if (!CURR_VERSION_INTEGER.equals(segmentVersion)) { - return new VersionConverterSubTask(getGroupId(), segment); + return new SubTask(getGroupId(), segment); } log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion); @@ -99,4 +120,65 @@ public class VersionConverterTask extends AbstractTask return TaskStatus.success(getId()); } + + public static class SubTask extends AbstractTask + { + private final DataSegment segment; + + protected SubTask( + @JsonProperty("groupId") String groupId, + @JsonProperty("segment") DataSegment segment + ) + { + super( + joinId( + groupId, + "sub", + segment.getInterval().getStart(), + segment.getInterval().getEnd(), + segment.getShardSpec().getPartitionNum() + ), + groupId, + segment.getDataSource(), + segment.getInterval() + ); + this.segment = segment; + } + + @Override + public String getType() + { + return "version_converter_sub"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); + convertSegment(toolbox, segment); + return success(); + } + } + + private static void convertSegment(TaskToolbox toolbox, final DataSegment segment) + throws SegmentLoadingException, IOException + { + log.info("Converting segment[%s]", segment); + final Map localSegments = toolbox.getSegments(Arrays.asList(segment)); + + final File location = localSegments.get(segment); + final File outLocation = new File(location, "v9_out"); + if (IndexIO.convertSegment(location, outLocation)) { + final int outVersion = IndexIO.getVersionFromDir(outLocation); + + // Appending to the version makes a new version that inherits most comparability parameters of the original + // version, but is "newer" than said original version. + DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion)); + updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); + + toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); + } else { + log.info("Conversion failed."); + } + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java deleted file mode 100644 index 16eeb1c3439..00000000000 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/WorkerSetupManagerConfig.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.merger.coordinator.config; - -import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; - -/** - */ -public abstract class WorkerSetupManagerConfig -{ - @Config("druid.indexer.configTable") - public abstract String getConfigTable(); - - @Config("druid.indexer.workerSetupConfigName") - public abstract String getWorkerSetupConfigName(); - - @Config("druid.indexer.poll.duration") - @Default("PT1M") - public abstract Duration getPollDuration(); -} diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 286d2e1649a..2112e3b6e37 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -39,6 +39,9 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.config.ConfigManager; +import com.metamx.druid.config.ConfigManagerConfig; +import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.GuiceServletConfig; @@ -78,7 +81,6 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig; import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig; -import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; import com.metamx.druid.merger.coordinator.scaling.AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy; import com.metamx.druid.merger.coordinator.scaling.NoopAutoScalingStrategy; @@ -566,18 +568,12 @@ public class IndexerCoordinatorNode extends RegisteringNode public void initializeWorkerSetupManager() { if (workerSetupManager == null) { - final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class); + final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class); + final ConfigManager configManager = new ConfigManager(dbi, configManagerConfig); + lifecycle.addManagedInstance(configManager); - DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable()); - workerSetupManager = new WorkerSetupManager( - dbi, Executors.newScheduledThreadPool( - 1, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("WorkerSetupManagerExec--%d") - .build() - ), jsonMapper, workerSetupManagerConfig - ); + DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable()); + workerSetupManager = new WorkerSetupManager(new JacksonConfigManager(configManager, jsonMapper)); } lifecycle.addManagedInstance(workerSetupManager); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 5f0714a8d61..d2d613c3ea3 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -98,18 +98,15 @@ public class IndexerCoordinatorResource @Produces("application/json") public Response doIndex(final Task task) { - // verify against whitelist - if (config.isWhitelistEnabled() && !config.getWhitelistDatasources().contains(task.getDataSource())) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - ImmutableMap.of( - "error", - String.format("dataSource[%s] is not whitelisted", task.getDataSource()) - ) - ) - .build(); - } + return taskPost(task); + } + @POST + @Path("/task") + @Consumes("application/json") + @Produces("application/json") + public Response taskPost(final Task task) + { taskMasterLifecycle.getTaskQueue().add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java index 666bbaa998c..70c15bcada0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/ScalingStats.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.merger.coordinator.scaling; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java index 89a0dd2d5c1..b9ce066c327 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupManager.java @@ -19,209 +19,41 @@ package com.metamx.druid.merger.coordinator.setup; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.metamx.common.ISE; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; -import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig; -import org.apache.commons.collections.MapUtils; +import com.metamx.druid.config.JacksonConfigManager; -import org.joda.time.Duration; -import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.FoldController; -import org.skife.jdbi.v2.Folder3; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.tweak.HandleCallback; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; /** */ public class WorkerSetupManager { - private static final Logger log = new Logger(WorkerSetupManager.class); + private static final String WORKER_SETUP_KEY = "worker.setup"; - private final DBI dbi; - private final ObjectMapper jsonMapper; - private final ScheduledExecutorService exec; - private final WorkerSetupManagerConfig config; - - private final Object lock = new Object(); + private final JacksonConfigManager configManager; private volatile AtomicReference workerSetupData = new AtomicReference(null); - private volatile boolean started = false; public WorkerSetupManager( - DBI dbi, - ScheduledExecutorService exec, - ObjectMapper jsonMapper, - WorkerSetupManagerConfig config + JacksonConfigManager configManager ) { - this.dbi = dbi; - this.exec = exec; - this.jsonMapper = jsonMapper; - this.config = config; + this.configManager = configManager; } @LifecycleStart public void start() { - synchronized (lock) { - if (started) { - return; - } - - ScheduledExecutors.scheduleWithFixedDelay( - exec, - new Duration(0), - config.getPollDuration(), - new Runnable() - { - @Override - public void run() - { - poll(); - } - } - ); - - started = true; - } + workerSetupData = configManager.watch(WORKER_SETUP_KEY, WorkerSetupData.class); } - @LifecycleStop - public void stop() - { - synchronized (lock) { - if (!started) { - return; - } - - started = false; - } - } - - public void poll() - { - try { - List setupDataList = dbi.withHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - return handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE name = :name", - config.getConfigTable() - ) - ) - .bind("name", config.getWorkerSetupConfigName()) - .fold( - Lists.newArrayList(), - new Folder3, Map>() - { - @Override - public ArrayList fold( - ArrayList workerNodeConfigurations, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - // stringObjectMap lowercases and jackson may fail serde - workerNodeConfigurations.add( - jsonMapper.readValue( - MapUtils.getString(stringObjectMap, "payload"), - WorkerSetupData.class - ) - ); - return workerNodeConfigurations; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); - } - } - ); - - if (setupDataList.isEmpty()) { - throw new ISE("WTF?! No configuration found for worker nodes!"); - } else if (setupDataList.size() != 1) { - throw new ISE("WTF?! Found more than one configuration for worker nodes"); - } - - workerSetupData.set(setupDataList.get(0)); - } - catch (Exception e) { - log.error(e, "Exception while polling for worker setup data!"); - } - } - - @SuppressWarnings("unchecked") public WorkerSetupData getWorkerSetupData() { - synchronized (lock) { - if (!started) { - throw new ISE("Must start WorkerSetupManager first!"); - } - - return workerSetupData.get(); - } + return workerSetupData.get(); } public boolean setWorkerSetupData(final WorkerSetupData value) { - synchronized (lock) { - try { - if (!started) { - throw new ISE("Must start WorkerSetupManager first!"); - } - - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - String.format( - "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", - config.getConfigTable() - ) - ) - .bind("name", config.getWorkerSetupConfigName()) - .bind("payload", jsonMapper.writeValueAsString(value)) - .execute(); - - return null; - } - } - ); - - workerSetupData.set(value); - } - catch (Exception e) { - log.error(e, "Exception updating worker config"); - return false; - } - } - - return true; + return configManager.set(WORKER_SETUP_KEY, value); } } diff --git a/merger/src/test/java/com/metamx/druid/merger/TestTask.java b/merger/src/test/java/com/metamx/druid/merger/TestTask.java index c23b498f739..d0a77cff447 100644 --- a/merger/src/test/java/com/metamx/druid/merger/TestTask.java +++ b/merger/src/test/java/com/metamx/druid/merger/TestTask.java @@ -26,14 +26,14 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; -import com.metamx.druid.merger.common.task.DefaultMergeTask; +import com.metamx.druid.merger.common.task.MergeTask; import java.util.List; /** */ @JsonTypeName("test") -public class TestTask extends DefaultMergeTask +public class TestTask extends MergeTask { private final String id; private final TaskStatus status; diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java similarity index 89% rename from merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskTest.java rename to merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java index 7c779f6a74c..a2f6e8175fb 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java @@ -31,7 +31,7 @@ import java.io.File; import java.util.List; import java.util.Map; -public class MergeTaskTest +public class MergeTaskBaseTest { private final DataSegment.Builder segmentBuilder = DataSegment.builder() .dataSource("foo") @@ -43,7 +43,7 @@ public class MergeTaskTest .add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build()) .build(); - final MergeTask testMergeTask = new MergeTask("foo", segments) + final MergeTaskBase testMergeTaskBase = new MergeTaskBase("foo", segments) { @Override protected File merge(Map segments, File outDir) throws Exception @@ -61,13 +61,13 @@ public class MergeTaskTest @Test public void testDataSource() { - Assert.assertEquals("foo", testMergeTask.getDataSource()); + Assert.assertEquals("foo", testMergeTaskBase.getDataSource()); } @Test public void testInterval() { - Assert.assertEquals(new Interval("2012-01-03/2012-01-07"), testMergeTask.getImplicitLockInterval().get()); + Assert.assertEquals(new Interval("2012-01-03/2012-01-07"), testMergeTaskBase.getImplicitLockInterval().get()); } @Test @@ -81,7 +81,7 @@ public class MergeTaskTest ).toString().toLowerCase() + "_"; Assert.assertEquals( desiredPrefix, - testMergeTask.getId().substring(0, desiredPrefix.length()) + testMergeTaskBase.getId().substring(0, desiredPrefix.length()) ); } } diff --git a/pom.xml b/pom.xml index 59a91567adb..078927877c6 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ UTF-8 - 0.20.0 + 0.20.1-SNAPSHOT diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index e434bff995d..acd94438d84 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -34,8 +34,8 @@ import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.master.DruidMaster; +import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.master.rules.Rule; -import com.metamx.druid.merge.ClientKillQuery; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -66,6 +66,7 @@ public class InfoResource private final DatabaseSegmentManager databaseSegmentManager; private final DatabaseRuleManager databaseRuleManager; private final DruidClusterInfo druidClusterInfo; + private final IndexingServiceClient indexingServiceClient; @Inject public InfoResource( @@ -73,7 +74,8 @@ public class InfoResource ServerInventoryManager serverInventoryManager, DatabaseSegmentManager databaseSegmentManager, DatabaseRuleManager databaseRuleManager, - DruidClusterInfo druidClusterInfo + DruidClusterInfo druidClusterInfo, + IndexingServiceClient indexingServiceClient ) { this.master = master; @@ -81,6 +83,7 @@ public class InfoResource this.databaseSegmentManager = databaseSegmentManager; this.databaseRuleManager = databaseRuleManager; this.druidClusterInfo = druidClusterInfo; + this.indexingServiceClient = indexingServiceClient; } @GET @@ -374,7 +377,7 @@ public class InfoResource ) { if (kill != null && Boolean.valueOf(kill)) { - master.killSegments(new ClientKillQuery(dataSourceName, new Interval(interval))); + indexingServiceClient.killSegments(dataSourceName, new Interval(interval)); } else { if (!databaseSegmentManager.removeDatasource(dataSourceName)) { return Response.status(Response.Status.NOT_FOUND).build(); diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 64e63176d0c..32e03e7ba63 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -33,6 +33,9 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.client.ServerInventoryManager; import com.metamx.druid.client.ServerInventoryManagerConfig; +import com.metamx.druid.config.ConfigManager; +import com.metamx.druid.config.ConfigManagerConfig; +import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.coordination.DruidClusterInfoConfig; import com.metamx.druid.db.DatabaseRuleManager; @@ -49,6 +52,7 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.log.LogLevelAdjuster; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMasterConfig; +import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.master.LoadQueuePeon; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; @@ -86,7 +90,7 @@ import java.util.concurrent.ScheduledExecutorService; */ public class MasterMain { - private static final Logger log = new Logger(ServerMain.class); + private static final Logger log = new Logger(MasterMain.class); public static void main(String[] args) throws Exception { @@ -166,13 +170,14 @@ public class MasterMain lifecycle ); - ServiceProvider serviceProvider = null; + IndexingServiceClient indexingServiceClient = null; if (druidMasterConfig.getMergerServiceName() != null) { - serviceProvider = Initialization.makeServiceProvider( + ServiceProvider serviceProvider = Initialization.makeServiceProvider( druidMasterConfig.getMergerServiceName(), serviceDiscovery, lifecycle ); + indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider); } final DruidClusterInfo druidClusterInfo = new DruidClusterInfo( @@ -180,10 +185,14 @@ public class MasterMain masterYp ); + JacksonConfigManager configManager = new JacksonConfigManager( + new ConfigManager(dbi, configFactory.build(ConfigManagerConfig.class)), jsonMapper + ); + final DruidMaster master = new DruidMaster( druidMasterConfig, druidClusterInfo, - jsonMapper, + configManager, databaseSegmentManager, serverInventoryManager, databaseRuleManager, @@ -191,9 +200,7 @@ public class MasterMain emitter, scheduledExecutorFactory, new ConcurrentHashMap(), - serviceProvider, - httpClient, - new ToStringResponseHandler(Charsets.UTF_8) + indexingServiceClient ); lifecycle.addManagedInstance(master); @@ -226,7 +233,8 @@ public class MasterMain databaseRuleManager, druidClusterInfo, master, - jsonMapper + jsonMapper, + indexingServiceClient ) ); diff --git a/server/src/main/java/com/metamx/druid/http/MasterResource.java b/server/src/main/java/com/metamx/druid/http/MasterResource.java index b725ed7f358..9bb59d79d43 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterResource.java +++ b/server/src/main/java/com/metamx/druid/http/MasterResource.java @@ -21,7 +21,6 @@ package com.metamx.druid.http; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.LoadPeonCallback; -import com.metamx.druid.merge.ClientKillQuery; import javax.inject.Inject; import javax.ws.rs.Consumes; diff --git a/server/src/main/java/com/metamx/druid/http/MasterServletModule.java b/server/src/main/java/com/metamx/druid/http/MasterServletModule.java index 47395f73eeb..64d0c98afa3 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterServletModule.java +++ b/server/src/main/java/com/metamx/druid/http/MasterServletModule.java @@ -27,6 +27,7 @@ import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.master.DruidMaster; +import com.metamx.druid.client.indexing.IndexingServiceClient; import com.sun.jersey.guice.JerseyServletModule; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; @@ -44,6 +45,7 @@ public class MasterServletModule extends JerseyServletModule private final DruidClusterInfo druidClusterInfo; private final DruidMaster master; private final ObjectMapper jsonMapper; + private final IndexingServiceClient indexingServiceClient; public MasterServletModule( ServerInventoryManager serverInventoryManager, @@ -51,7 +53,8 @@ public class MasterServletModule extends JerseyServletModule DatabaseRuleManager databaseRuleManager, DruidClusterInfo druidClusterInfo, DruidMaster master, - ObjectMapper jsonMapper + ObjectMapper jsonMapper, + IndexingServiceClient indexingServiceClient ) { this.serverInventoryManager = serverInventoryManager; @@ -60,6 +63,7 @@ public class MasterServletModule extends JerseyServletModule this.druidClusterInfo = druidClusterInfo; this.master = master; this.jsonMapper = jsonMapper; + this.indexingServiceClient = indexingServiceClient; } @Override @@ -72,6 +76,7 @@ public class MasterServletModule extends JerseyServletModule bind(DatabaseRuleManager.class).toInstance(databaseRuleManager); bind(DruidMaster.class).toInstance(master); bind(DruidClusterInfo.class).toInstance(druidClusterInfo); + bind(IndexingServiceClient.class).toInstance(indexingServiceClient); serve("/*").with(GuiceContainer.class); } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index a21408144b3..74b9d17d57b 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -19,10 +19,8 @@ package com.metamx.druid.master; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -41,32 +39,30 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.ServerInventoryManager; +import com.metamx.druid.client.indexing.IndexingServiceClient; +import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseSegmentManager; -import com.metamx.druid.merge.ClientKillQuery; +import com.metamx.druid.index.v1.IndexIO; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.phonebook.PhoneBook; import com.metamx.phonebook.PhoneBookPeon; -import com.netflix.curator.x.discovery.ServiceProvider; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; -import java.net.URL; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -83,26 +79,22 @@ public class DruidMaster private final DruidMasterConfig config; private final DruidClusterInfo clusterInfo; + private final JacksonConfigManager configManager; private final DatabaseSegmentManager databaseSegmentManager; private final ServerInventoryManager serverInventoryManager; private final DatabaseRuleManager databaseRuleManager; private final PhoneBook yp; private final ServiceEmitter emitter; + private final IndexingServiceClient indexingServiceClient; private final ScheduledExecutorService exec; private final ScheduledExecutorService peonExec; private final PhoneBookPeon masterPeon; private final Map loadManagementPeons; - private final ServiceProvider serviceProvider; - - private final HttpClient httpClient; - private final HttpResponseHandler responseHandler; - - private final ObjectMapper jsonMapper; public DruidMaster( DruidMasterConfig config, DruidClusterInfo clusterInfo, - ObjectMapper jsonMapper, + JacksonConfigManager configManager, DatabaseSegmentManager databaseSegmentManager, ServerInventoryManager serverInventoryManager, DatabaseRuleManager databaseRuleManager, @@ -110,31 +102,25 @@ public class DruidMaster ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, Map loadManagementPeons, - ServiceProvider serviceProvider, - HttpClient httpClient, - HttpResponseHandler responseHandler + IndexingServiceClient indexingServiceClient ) { this.config = config; this.clusterInfo = clusterInfo; - - this.jsonMapper = jsonMapper; + this.configManager = configManager; this.databaseSegmentManager = databaseSegmentManager; this.serverInventoryManager = serverInventoryManager; this.databaseRuleManager = databaseRuleManager; this.yp = zkPhoneBook; this.emitter = emitter; + this.indexingServiceClient = indexingServiceClient; this.masterPeon = new MasterListeningPeon(); this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.peonExec = scheduledExecutorFactory.create(1, "Master-PeonExec--%d"); this.loadManagementPeons = loadManagementPeons; - - this.serviceProvider = serviceProvider; - this.httpClient = httpClient; - this.responseHandler = responseHandler; } public boolean isClusterMaster() @@ -349,27 +335,6 @@ public class DruidMaster } } - public void killSegments(ClientKillQuery killQuery) - { - try { - httpClient.post( - new URL( - String.format( - "http://%s:%s/mmx/merger/v1/index", - serviceProvider.getInstance().getAddress(), - serviceProvider.getInstance().getPort() - ) - ) - ) - .setContent("application/json", jsonMapper.writeValueAsBytes(killQuery)) - .go(responseHandler) - .get(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - public Set getAvailableDataSegments() { Set availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator())); @@ -390,7 +355,9 @@ public class DruidMaster for (DataSegment dataSegment : dataSegments) { if (dataSegment.getSize() < 0) { - log.warn("No size on Segment[%s], wtf?", dataSegment); + log.makeAlert("No size on Segment, wtf?") + .addData("segment", dataSegment) + .emit(); } availableSegments.add(dataSegment); } @@ -466,8 +433,14 @@ public class DruidMaster final List> masterRunnables = Lists.newArrayList(); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); - if (config.isMergeSegments() && serviceProvider != null) { - masterRunnables.add(Pair.of(new MasterSegmentMergerRunnable(), config.getMasterSegmentMergerPeriod())); + if (config.isMergeSegments() && indexingServiceClient != null) { + + masterRunnables.add( + Pair.of( + new MasterSegmentMergerRunnable(configManager.watch(MergerWhitelist.CONFIG_KEY, MergerWhitelist.class)), + config.getMasterSegmentMergerPeriod() + ) + ); } for (final Pair masterRunnable : masterRunnables) { @@ -529,6 +502,39 @@ public class DruidMaster } } + public static class DruidMasterVersionConverter implements DruidMasterHelper + { + private final IndexingServiceClient indexingServiceClient; + private final AtomicReference whitelistRef; + + public DruidMasterVersionConverter( + IndexingServiceClient indexingServiceClient, + AtomicReference whitelistRef + ) + { + this.indexingServiceClient = indexingServiceClient; + this.whitelistRef = whitelistRef; + } + + @Override + public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) + { + MergerWhitelist whitelist = whitelistRef.get(); + + for (DataSegment dataSegment : params.getAvailableSegments()) { + if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) { + final Integer binaryVersion = dataSegment.getBinaryVersion(); + + if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) { + indexingServiceClient.upgradeSegment(dataSegment); + } + } + } + + return params; + } + } + private class MasterListeningPeon implements PhoneBookPeon { @Override @@ -723,12 +729,13 @@ public class DruidMaster private class MasterSegmentMergerRunnable extends MasterRunnable { - private MasterSegmentMergerRunnable() + private MasterSegmentMergerRunnable(final AtomicReference whitelistRef) { super( ImmutableList.of( new DruidMasterSegmentInfoLoader(DruidMaster.this), - new DruidMasterSegmentMerger(jsonMapper, serviceProvider), + new DruidMasterVersionConverter(indexingServiceClient, whitelistRef), + new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef), new DruidMasterHelper() { @Override @@ -739,8 +746,7 @@ public class DruidMaster params.getEmitter().emit( new ServiceMetricEvent.Builder().build( - "master/merge/count", - stats.getGlobalStats().get("mergedCount") + "master/merge/count", stats.getGlobalStats().get("mergedCount") ) ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java index f3d93d0e6fd..ef4f9d1e6d3 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java @@ -19,8 +19,6 @@ package com.metamx.druid.master; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; @@ -32,22 +30,19 @@ import com.google.common.collect.Multiset; import com.google.common.collect.Ordering; import com.metamx.common.Pair; import com.metamx.common.guava.FunctionalIterable; -import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.partition.PartitionChunk; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import com.metamx.http.client.response.ToStringResponseHandler; -import com.netflix.curator.x.discovery.ServiceProvider; import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -55,44 +50,40 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper { private static final Logger log = new Logger(DruidMasterSegmentMerger.class); - private final MergerClient mergerClient; + private final IndexingServiceClient indexingServiceClient; + private final AtomicReference whiteListRef; - public DruidMasterSegmentMerger(MergerClient mergerClient) + public DruidMasterSegmentMerger( + IndexingServiceClient indexingServiceClient, + AtomicReference whitelistRef + ) { - this.mergerClient = mergerClient; - } - - public DruidMasterSegmentMerger(ObjectMapper jsonMapper, ServiceProvider serviceProvider) - { - this.mergerClient = new HttpMergerClient( - HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), - new Lifecycle() - ), - new ToStringResponseHandler(Charsets.UTF_8), - jsonMapper, - serviceProvider - ); + this.indexingServiceClient = indexingServiceClient; + this.whiteListRef = whitelistRef; } @Override public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) { + MergerWhitelist whitelist = whiteListRef.get(); + MasterStats stats = new MasterStats(); Map> dataSources = Maps.newHashMap(); // Find serviced segments by using a timeline for (DataSegment dataSegment : params.getAvailableSegments()) { - VersionedIntervalTimeline timeline = dataSources.get(dataSegment.getDataSource()); - if (timeline == null) { - timeline = new VersionedIntervalTimeline(Ordering.natural()); - dataSources.put(dataSegment.getDataSource(), timeline); + if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) { + VersionedIntervalTimeline timeline = dataSources.get(dataSegment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline(Ordering.natural()); + dataSources.put(dataSegment.getDataSource(), timeline); + } + timeline.add( + dataSegment.getInterval(), + dataSegment.getVersion(), + dataSegment.getShardSpec().createChunk(dataSegment) + ); } - timeline.add( - dataSegment.getInterval(), - dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) - ); } // Find segments to merge @@ -161,7 +152,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper log.info("[%s] Found %d segments to merge %s", dataSource, segments.size(), segmentNames); try { - mergerClient.runRequest(dataSource, segments); + indexingServiceClient.mergeSegments(segments); } catch (Exception e) { log.error( diff --git a/server/src/main/java/com/metamx/druid/master/HttpMergerClient.java b/server/src/main/java/com/metamx/druid/master/HttpMergerClient.java deleted file mode 100644 index 07a0f8dba6e..00000000000 --- a/server/src/main/java/com/metamx/druid/master/HttpMergerClient.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.master; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.metamx.druid.client.DataSegment; -import com.metamx.druid.merge.ClientAppendQuery; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.response.HttpResponseHandler; -import com.netflix.curator.x.discovery.ServiceProvider; - - -import java.net.URL; -import java.util.List; - -public class HttpMergerClient implements MergerClient -{ - private final HttpClient client; - private final HttpResponseHandler responseHandler; - private final ObjectMapper jsonMapper; - private final ServiceProvider serviceProvider; - - public HttpMergerClient( - HttpClient client, - HttpResponseHandler responseHandler, - ObjectMapper jsonMapper, - ServiceProvider serviceProvider - ) - { - this.client = client; - this.responseHandler = responseHandler; - this.jsonMapper = jsonMapper; - this.serviceProvider = serviceProvider; - } - - public void runRequest(String dataSource, List segments) - { - try { - byte[] dataToSend = jsonMapper.writeValueAsBytes( - new ClientAppendQuery(dataSource, segments) - ); - - client.post( - new URL( - String.format( - "http://%s:%s/mmx/merger/v1/merge", - serviceProvider.getInstance().getAddress(), - serviceProvider.getInstance().getPort() - ) - ) - ) - .setContent("application/json", dataToSend) - .go(responseHandler) - .get(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } -} diff --git a/server/src/main/java/com/metamx/druid/master/MergerClient.java b/server/src/main/java/com/metamx/druid/master/MergerWhitelist.java similarity index 57% rename from server/src/main/java/com/metamx/druid/master/MergerClient.java rename to server/src/main/java/com/metamx/druid/master/MergerWhitelist.java index c2556ccaea0..bd55a0cf057 100644 --- a/server/src/main/java/com/metamx/druid/master/MergerClient.java +++ b/server/src/main/java/com/metamx/druid/master/MergerWhitelist.java @@ -19,13 +19,36 @@ package com.metamx.druid.master; -import com.metamx.druid.client.DataSegment; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.common.collect.Sets; import java.util.List; +import java.util.Set; /** */ -public interface MergerClient +public class MergerWhitelist { - public void runRequest(String dataSource, List segments); + public static final String CONFIG_KEY = "merger.whitelist"; + + private final Set dataSources; + + @JsonCreator + public MergerWhitelist(Set dataSources) + { + this.dataSources = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + this.dataSources.addAll(dataSources); + } + + @JsonValue + public Set getDataSources() + { + return dataSources; + } + + public boolean contains(String val) + { + return dataSources.contains(val); + } } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java index 8e19ed5d330..1ad1f96d163 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java @@ -23,12 +23,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.client.indexing.IndexingServiceClient; import junit.framework.Assert; import org.joda.time.Interval; import org.junit.Test; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; public class DruidMasterSegmentMergerTest { @@ -367,11 +369,7 @@ public class DruidMasterSegmentMergerTest DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() ); - Assert.assertEquals( - ImmutableList.of( - ImmutableList.of(segments.get(4), segments.get(5)) - ), merge(segments) - ); + Assert.assertEquals(ImmutableList.of(ImmutableList.of(segments.get(4), segments.get(5))), merge(segments)); } /** @@ -380,16 +378,17 @@ public class DruidMasterSegmentMergerTest private static List> merge(final Collection segments) { final List> retVal = Lists.newArrayList(); - final MergerClient mergerClient = new MergerClient() + final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null) { @Override - public void runRequest(String dataSource, List segmentsToMerge) + public void mergeSegments(List segmentsToMerge) { retVal.add(segmentsToMerge); } }; - final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(mergerClient); + final AtomicReference whitelistRef = new AtomicReference(null); + final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(indexingServiceClient, whitelistRef); final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder() .withAvailableSegments(ImmutableSet.copyOf(segments)) .withMergeBytesLimit(mergeBytesLimit) diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index 895659586ec..584ae31de47 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -152,8 +152,6 @@ public class DruidMasterTest new NoopServiceEmitter(), scheduledExecutorFactory, loadManagementPeons, - null, - null, null ); }