diff --git a/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java b/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java deleted file mode 100644 index a6863991c53..00000000000 --- a/server/src/main/java/io/druid/db/DerbyMetadataRuleManager.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.db; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import io.druid.guice.annotations.Json; -import io.druid.server.coordinator.rules.Rule; -import org.skife.jdbi.v2.IDBI; - -import java.sql.Blob; -import java.util.List; -import java.util.Map; - -public class DerbyMetadataRuleManager extends SQLMetadataRuleManager -{ - private final ObjectMapper jsonMapper; - - @Inject - public DerbyMetadataRuleManager( - @Json ObjectMapper jsonMapper, - Supplier config, - Supplier dbTables, - IDBI dbi - ) { - super(jsonMapper, config, dbTables, dbi); - this.jsonMapper = jsonMapper; - } - - @Override - protected List getRules(Map stringObjectMap) { - try { - Blob payload = (Blob)stringObjectMap.get("payload"); - List rules = jsonMapper.readValue( - payload.getBinaryStream(), new TypeReference>() {} - ); - return rules; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } -} diff --git a/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java b/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java deleted file mode 100644 index 88015b014e8..00000000000 --- a/server/src/main/java/io/druid/db/DerbyMetadataRuleManagerProvider.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.db; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import com.metamx.common.lifecycle.Lifecycle; -import org.skife.jdbi.v2.IDBI; - -/** - */ -public class DerbyMetadataRuleManagerProvider implements MetadataRuleManagerProvider -{ - private final ObjectMapper jsonMapper; - private final Supplier config; - private final Supplier dbTables; - private final MetadataStorageConnector dbConnector; - private final Lifecycle lifecycle; - private final IDBI dbi; - - @Inject - public DerbyMetadataRuleManagerProvider( - ObjectMapper jsonMapper, - Supplier config, - Supplier dbTables, - MetadataStorageConnector dbConnector, - IDBI dbi, - Lifecycle lifecycle - ) - { - this.jsonMapper = jsonMapper; - this.config = config; - this.dbTables = dbTables; - this.dbConnector = dbConnector; - this.dbi = dbi; - this.lifecycle = lifecycle; - } - - @Override - public DerbyMetadataRuleManager get() - { - try { - lifecycle.addMaybeStartHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - dbConnector.createRulesTable(); - SQLMetadataRuleManager.createDefaultRule( - dbi, dbTables.get().getRulesTable(), config.get().getDefaultRule(), jsonMapper - ); - } - - @Override - public void stop() - { - - } - } - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - return new DerbyMetadataRuleManager(jsonMapper, config, dbTables, dbi); - } -} \ No newline at end of file diff --git a/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java b/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java deleted file mode 100644 index 00036c0d3d7..00000000000 --- a/server/src/main/java/io/druid/db/DerbyMetadataSegmentManager.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.db; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; -import com.google.inject.Inject; -import com.metamx.common.logger.Logger; -import io.druid.client.DruidDataSource; -import io.druid.guice.ManageLifecycle; -import io.druid.timeline.DataSegment; -import io.druid.timeline.TimelineObjectHolder; -import io.druid.timeline.VersionedIntervalTimeline; -import io.druid.timeline.partition.PartitionChunk; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.skife.jdbi.v2.Batch; -import org.skife.jdbi.v2.FoldController; -import org.skife.jdbi.v2.Folder3; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.IDBI; -import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.tweak.HandleCallback; - -import java.sql.SQLException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; - -/** - */ -@ManageLifecycle -public class DerbyMetadataSegmentManager extends SQLMetadataSegmentManager -{ - private static final Logger log = new Logger(DerbyMetadataSegmentManager.class); - - private final Object lock = new Object(); - - private final ObjectMapper jsonMapper; - private final Supplier dbTables; - private final AtomicReference> dataSources; - private final IDBI dbi; - - @Inject - public DerbyMetadataSegmentManager( - ObjectMapper jsonMapper, - Supplier config, - Supplier dbTables, - IDBI dbi - ) - { - super(jsonMapper, config, dbTables, dbi); - this.jsonMapper = jsonMapper; - this.dbTables = dbTables; - this.dataSources = new AtomicReference>( - new ConcurrentHashMap() - ); - this.dbi = dbi; - } - - @Override - public boolean enableDatasource(final String ds) - { - try { - VersionedIntervalTimeline segmentTimeline = dbi.withHandle( - new HandleCallback>() - { - @Override - public VersionedIntervalTimeline withHandle(Handle handle) throws Exception - { - return handle.createQuery( - String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", getSegmentsTable()) - ) - .bind("dataSource", ds) - .fold( - new VersionedIntervalTimeline(Ordering.natural()), - new Folder3, Map>() - { - @Override - public VersionedIntervalTimeline fold( - VersionedIntervalTimeline timeline, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - java.sql.Clob payloadClob = (java.sql.Clob)stringObjectMap.get("payload"); - String payload = payloadClob.getSubString(1, (int)payloadClob.length()).replace("\\", ""); - DataSegment segment = jsonMapper.readValue( - payload, - DataSegment.class - ); - - timeline.add( - segment.getInterval(), - segment.getVersion(), - segment.getShardSpec().createChunk(segment) - ); - - return timeline; - } - catch (Exception e) { - throw new SQLException(e.toString()); - } - } - } - ); - } - } - ); - - final List segments = Lists.newArrayList(); - for (TimelineObjectHolder objectHolder : segmentTimeline.lookup( - new Interval( - "0000-01-01/3000-01-01" - ) - )) { - for (PartitionChunk partitionChunk : objectHolder.getObject()) { - segments.add(partitionChunk.getObject()); - } - } - - if (segments.isEmpty()) { - log.warn("No segments found in the database!"); - return false; - } - - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - Batch batch = handle.createBatch(); - - for (DataSegment segment : segments) { - batch.add( - String.format( - "UPDATE %s SET used=true WHERE id = '%s'", - getSegmentsTable(), - segment.getIdentifier() - ) - ); - } - batch.execute(); - - return null; - } - } - ); - } - catch (Exception e) { - log.error(e, "Exception enabling datasource %s", ds); - return false; - } - - return true; - } - - @Override - public void poll() - { - try { - if (!isStarted()) { - return; - } - - ConcurrentHashMap newDataSources = new ConcurrentHashMap(); - - List> segmentRows = dbi.withHandle( - new HandleCallback>>() - { - @Override - public List> withHandle(Handle handle) throws Exception - { - return handle.createQuery( - String.format("SELECT * FROM %s WHERE used=true", getSegmentsTable()) - ).fold( - new LinkedList>(), - new Folder3>, Map>() - { - @Override - public LinkedList> fold( - LinkedList> retVal, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - java.sql.Clob payloadClob = (java.sql.Clob)stringObjectMap.get("payload"); - String payload = payloadClob.getSubString(1, (int)payloadClob.length()).replace("\\", ""); - stringObjectMap.put("payload", payload); - retVal.add(stringObjectMap); - return retVal; - } - } - ); - } - } - ); - - - - if (segmentRows == null || segmentRows.isEmpty()) { - log.warn("No segments found in the database!"); - return; - } - - log.info("Polled and found %,d segments in the database", segmentRows.size()); - - for (final Map segmentRow : segmentRows) { - DataSegment segment = jsonMapper.readValue((String) segmentRow.get("payload"), DataSegment.class); - - String datasourceName = segment.getDataSource(); - - DruidDataSource dataSource = newDataSources.get(datasourceName); - if (dataSource == null) { - dataSource = new DruidDataSource( - datasourceName, - ImmutableMap.of("created", new DateTime().toString()) - ); - - Object shouldBeNull = newDataSources.put( - datasourceName, - dataSource - ); - if (shouldBeNull != null) { - log.warn( - "Just put key[%s] into dataSources and what was there wasn't null!? It was[%s]", - datasourceName, - shouldBeNull - ); - } - } - - if (!dataSource.getSegments().contains(segment)) { - dataSource.addSegment(segment.getIdentifier(), segment); - } - } - - synchronized (lock) { - if (isStarted()) { - dataSources.set(newDataSources); - } - } - } - catch (Exception e) { - log.error(e, "Problem polling DB."); - } - } - - private String getSegmentsTable() - { - return dbTables.get().getSegmentsTable(); - } -} diff --git a/server/src/main/java/io/druid/db/SQLMetadataConnector.java b/server/src/main/java/io/druid/db/SQLMetadataConnector.java index f3bf2ae6cd8..e3067fb50ed 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/db/SQLMetadataConnector.java @@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; +import org.skife.jdbi.v2.util.ByteArrayMapper; import javax.sql.DataSource; import java.sql.Connection; @@ -313,17 +314,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector { List matched = handle.createQuery(selectStatement) .bind("key", key) - .map( - new ResultSetMapper() - { - @Override - public byte[] map(int index, ResultSet r, StatementContext ctx) - throws SQLException - { - return r.getBytes(valueColumn); - } - } - ).list(); + .map(ByteArrayMapper.FIRST) + .list(); if (matched.isEmpty()) { return null; diff --git a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java index 0b348f33ece..68df33225c1 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java @@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; -import com.metamx.common.MapUtils; +import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -47,7 +47,10 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.tweak.ResultSetMapper; +import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.List; @@ -215,30 +218,52 @@ public class SQLMetadataRuleManager implements MetadataRuleManager + "ON r.datasource = ds.datasource and r.version = ds.version", getRulesTable() ) - ).fold( - Maps.>newHashMap(), - new Folder3>, Map>() + ).map( + new ResultSetMapper>>() { @Override - public Map> fold( - Map> retVal, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException + public Pair> map(int index, ResultSet r, StatementContext ctx) + throws SQLException { try { - String dataSource = MapUtils.getString(stringObjectMap, "dataSource"); - List rules = getRules(stringObjectMap); - retVal.put(dataSource, rules); - return retVal; + return Pair.of( + r.getString("dataSource"), + jsonMapper.>readValue( + r.getBytes("payload"), new TypeReference>() + { + } + ) + ); } - catch (Exception e) { + catch (IOException e) { throw Throwables.propagate(e); } } } - ); + ) + .fold( + Maps.>newHashMap(), + new Folder3>, Pair>>() + { + @Override + public Map> fold( + Map> retVal, + Pair> stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + String dataSource = stringObjectMap.lhs; + retVal.put(dataSource, stringObjectMap.rhs); + return retVal; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); } } ) @@ -314,17 +339,8 @@ public class SQLMetadataRuleManager implements MetadataRuleManager return true; } - private String getRulesTable() {return dbTables.get().getRulesTable();} - - protected List getRules(Map stringObjectMap) { - try { - return jsonMapper.readValue( - MapUtils.getString(stringObjectMap, "payload"), new TypeReference>() - { - } - ); - } catch (Exception e) { - throw Throwables.propagate(e); - } + private String getRulesTable() + { + return dbTables.get().getRulesTable(); } -} \ No newline at end of file +} diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java index c9f7832d7b0..f89b4d8e186 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java @@ -47,6 +47,7 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.util.ByteArrayMapper; import java.sql.SQLException; import java.util.ArrayList; @@ -151,21 +152,22 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", getSegmentsTable()) ) .bind("dataSource", ds) + .map(ByteArrayMapper.FIRST) .fold( new VersionedIntervalTimeline(Ordering.natural()), - new Folder3, Map>() + new Folder3, byte[]>() { @Override public VersionedIntervalTimeline fold( VersionedIntervalTimeline timeline, - Map stringObjectMap, + byte[] payload, FoldController foldController, StatementContext statementContext ) throws SQLException { try { DataSegment segment = jsonMapper.readValue( - (String) stringObjectMap.get("payload"), + payload, DataSegment.class ); @@ -410,21 +412,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager ConcurrentHashMap newDataSources = new ConcurrentHashMap(); - List> segmentRows = dbi.withHandle( - new HandleCallback>>() + List segmentRows = dbi.withHandle( + new HandleCallback>() { @Override - public List> withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) throws Exception { return handle.createQuery( String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()) - ).list(); + ) + .map(ByteArrayMapper.FIRST) + .list(); } } ); - - if (segmentRows == null || segmentRows.isEmpty()) { log.warn("No segments found in the database!"); return; @@ -432,8 +434,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager log.info("Polled and found %,d segments in the database", segmentRows.size()); - for (final Map segmentRow : segmentRows) { - DataSegment segment = jsonMapper.readValue((String) segmentRow.get("payload"), DataSegment.class); + for (final byte[] payload : segmentRows) { + DataSegment segment = jsonMapper.readValue(payload, DataSegment.class); String datasourceName = segment.getDataSource(); diff --git a/server/src/main/java/io/druid/guice/DerbyMetadataStorageDruidModule.java b/server/src/main/java/io/druid/guice/DerbyMetadataStorageDruidModule.java index bc80fc42fec..dd17997b9d3 100644 --- a/server/src/main/java/io/druid/guice/DerbyMetadataStorageDruidModule.java +++ b/server/src/main/java/io/druid/guice/DerbyMetadataStorageDruidModule.java @@ -23,9 +23,9 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; -import io.druid.db.DerbyMetadataRuleManager; -import io.druid.db.DerbyMetadataRuleManagerProvider; -import io.druid.db.DerbyMetadataSegmentManager; +import io.druid.db.SQLMetadataRuleManager; +import io.druid.db.SQLMetadataRuleManagerProvider; +import io.druid.db.SQLMetadataSegmentManager; import io.druid.db.SQLMetadataSegmentManagerProvider; import io.druid.db.IndexerSQLMetadataStorageCoordinator; import io.druid.db.MetadataRuleManager; @@ -56,16 +56,16 @@ public class DerbyMetadataStorageDruidModule implements Module binder, "druid.db.type", Key.get(MetadataStorageConnector.class), Key.get(DerbyConnector.class) ); PolyBind.createChoice( - binder, "druid.db.type", Key.get(MetadataSegmentManager.class), Key.get(DerbyMetadataSegmentManager.class) + binder, "druid.db.type", Key.get(MetadataSegmentManager.class), Key.get(SQLMetadataSegmentManager.class) ); PolyBind.createChoice( binder, "druid.db.type", Key.get(MetadataSegmentManagerProvider.class), Key.get(SQLMetadataSegmentManagerProvider.class) ); PolyBind.createChoice( - binder, "druid.db.type", Key.get(MetadataRuleManager.class), Key.get(DerbyMetadataRuleManager.class) + binder, "druid.db.type", Key.get(MetadataRuleManager.class), Key.get(SQLMetadataRuleManager.class) ); PolyBind.createChoice( - binder, "druid.db.type", Key.get(MetadataRuleManagerProvider.class), Key.get(DerbyMetadataRuleManagerProvider.class) + binder, "druid.db.type", Key.get(MetadataRuleManagerProvider.class), Key.get(SQLMetadataRuleManagerProvider.class) ); PolyBind.createChoice( binder, "druid.db.type", Key.get(SegmentPublisher.class), Key.get(SQLMetadataSegmentPublisher.class) @@ -94,7 +94,7 @@ public class DerbyMetadataStorageDruidModule implements Module PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class)) .addBinding("derby") - .to(DerbyMetadataSegmentManager.class) + .to(SQLMetadataSegmentManager.class) .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class)) @@ -104,12 +104,12 @@ public class DerbyMetadataStorageDruidModule implements Module PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class)) .addBinding("derby") - .to(DerbyMetadataRuleManager.class) + .to(SQLMetadataRuleManager.class) .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class)) .addBinding("derby") - .to(DerbyMetadataRuleManagerProvider.class) + .to(SQLMetadataRuleManagerProvider.class) .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class)) diff --git a/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java b/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java index 42c50903353..3f087645e2f 100644 --- a/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/db/MetadataSegmentManagerTest.java @@ -37,7 +37,7 @@ import java.util.Map; */ public class MetadataSegmentManagerTest { - private DerbyMetadataSegmentManager manager; + private SQLMetadataSegmentManager manager; private IDBI dbi; private List> testRows; @@ -45,7 +45,7 @@ public class MetadataSegmentManagerTest public void setUp() throws Exception { dbi = EasyMock.createMock(IDBI.class); - manager = new DerbyMetadataSegmentManager( + manager = new SQLMetadataSegmentManager( new DefaultObjectMapper(), Suppliers.ofInstance(new MetadataSegmentManagerConfig()), Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase("test")),