diff --git a/common/src/main/java/io/druid/concurrent/LifecycleLock.java b/common/src/main/java/io/druid/concurrent/LifecycleLock.java index e872344bb70..a94e3cd31a3 100644 --- a/common/src/main/java/io/druid/concurrent/LifecycleLock.java +++ b/common/src/main/java/io/druid/concurrent/LifecycleLock.java @@ -115,6 +115,11 @@ public final class LifecycleLock } } + boolean isStarted() + { + return getState() == START_EXITED_SUCCESSFUL; + } + boolean awaitStarted() { try { @@ -124,7 +129,7 @@ public final class LifecycleLock catch (InterruptedException e) { throw new RuntimeException(e); } - return getState() == START_EXITED_SUCCESSFUL; + return isStarted(); } boolean awaitStarted(long timeNanos) @@ -138,7 +143,7 @@ public final class LifecycleLock catch (InterruptedException e) { throw new RuntimeException(e); } - return getState() == START_EXITED_SUCCESSFUL; + return isStarted(); } @Override @@ -210,6 +215,15 @@ public final class LifecycleLock sync.exitStart(); } + /** + * Returns {@code true} if {@link #started()} was called before that. Returns {@code false} if {@link #started()} is + * not called before {@link #exitStart()}, or if {@link #canStop()} is already called on this LifecycleLock. + */ + public boolean isStarted() + { + return sync.isStarted(); + } + /** * Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called * before that. Returns {@code false} if {@link #started()} is not called before {@link #exitStart()}, or if {@link diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 3038e292048..c2f2a1d7833 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -379,6 +379,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer catch (Exception e) { throw Throwables.propagate(e); } + finally { + lifecycleLock.exitStop(); + } } @Override diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 08c75a540ee..a7b073d0370 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -35,12 +35,13 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.client.DruidDataSource; -import io.druid.java.util.common.concurrent.Execs; +import io.druid.concurrent.LifecycleLock; import io.druid.guice.ManageLifecycle; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.MapUtils; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.timeline.DataSegment; @@ -82,20 +83,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private static final Interner DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class); - - private final Object lock = new Object(); + private final LifecycleLock lifecycleLock = new LifecycleLock(); private final ObjectMapper jsonMapper; private final Supplier config; private final Supplier dbTables; - private final AtomicReference> dataSources; + private final AtomicReference> dataSourcesRef; private final SQLMetadataConnector connector; private volatile ListeningScheduledExecutorService exec = null; private volatile ListenableFuture future = null; - private volatile boolean started = false; - @Inject public SQLMetadataSegmentManager( ObjectMapper jsonMapper, @@ -107,7 +105,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager this.jsonMapper = jsonMapper; this.config = config; this.dbTables = dbTables; - this.dataSources = new AtomicReference<>( + this.dataSourcesRef = new AtomicReference<>( new ConcurrentHashMap() ); this.connector = connector; @@ -117,11 +115,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @LifecycleStart public void start() { - synchronized (lock) { - if (started) { - return; - } + if (!lifecycleLock.canStart()) { + return; + } + try { exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d")); final Duration delay = config.get().getPollDuration().toStandardDuration(); @@ -144,7 +142,10 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager delay.getMillis(), TimeUnit.MILLISECONDS ); - started = true; + lifecycleLock.started(); + } + finally { + lifecycleLock.exitStart(); } } @@ -152,18 +153,24 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @LifecycleStop public void stop() { - synchronized (lock) { - if (!started) { - return; - } + if (!lifecycleLock.canStop()) { + return; + } + try { + final ConcurrentHashMap emptyMap = new ConcurrentHashMap<>(); + ConcurrentHashMap current; + do { + current = dataSourcesRef.get(); + } while (!dataSourcesRef.compareAndSet(current, emptyMap)); - started = false; - dataSources.set(new ConcurrentHashMap()); future.cancel(false); future = null; exec.shutdownNow(); exec = null; } + finally { + lifecycleLock.exitStop(); + } } @Override @@ -302,31 +309,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager public boolean removeDatasource(final String ds) { try { - ConcurrentHashMap dataSourceMap = dataSources.get(); - - if (!dataSourceMap.containsKey(ds)) { - log.warn("Cannot delete datasource %s, does not exist", ds); - return false; - } - - connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable()) - ) - .bind("dataSource", ds) - .execute(); - - return null; - } - } + final int removed = connector.getDBI().withHandle( + handle -> handle.createStatement( + StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable()) + ).bind("dataSource", ds).execute() ); - dataSourceMap.remove(ds); + dataSourcesRef.get().remove(ds); + + if (removed == 0) { + return false; + } } catch (Exception e) { log.error(e, "Error removing datasource %s", ds); @@ -340,34 +333,25 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager public boolean removeSegment(String ds, final String segmentID) { try { - connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable()) - ).bind("segmentID", segmentID) - .execute(); - - return null; - } - } + final int removed = connector.getDBI().withHandle( + handle -> handle.createStatement( + StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable()) + ).bind("segmentID", segmentID).execute() ); - ConcurrentHashMap dataSourceMap = dataSources.get(); - - if (!dataSourceMap.containsKey(ds)) { - log.warn("Cannot find datasource %s", ds); - return false; - } + ConcurrentHashMap dataSourceMap = dataSourcesRef.get(); DruidDataSource dataSource = dataSourceMap.get(ds); - dataSource.removePartition(segmentID); + if (dataSource != null) { + dataSource.removePartition(segmentID); - if (dataSource.isEmpty()) { - dataSourceMap.remove(ds); + if (dataSource.isEmpty()) { + dataSourceMap.remove(ds); + } + } + + if (removed == 0) { + return false; } } catch (Exception e) { @@ -381,69 +365,59 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @Override public boolean isStarted() { - return started; + return lifecycleLock.isStarted(); } @Override public DruidDataSource getInventoryValue(String key) { - return dataSources.get().get(key); + return dataSourcesRef.get().get(key); } @Override public Collection getInventory() { - return dataSources.get().values(); + return dataSourcesRef.get().values(); } @Override public Collection getAllDatasourceNames() { - synchronized (lock) { - return connector.getDBI().withHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - return handle.createQuery( - StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable()) - ) - .fold( - Lists.newArrayList(), - new Folder3, Map>() - { - @Override - public ArrayList fold( - ArrayList druidDataSources, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - druidDataSources.add( - MapUtils.getString(stringObjectMap, "datasource") - ); - return druidDataSources; - } - } - ); - - } - } - ); - } + return connector.getDBI().withHandle( + handle -> handle.createQuery( + StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable()) + ) + .fold( + new ArrayList<>(), + new Folder3, Map>() + { + @Override + public List fold( + List druidDataSources, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + druidDataSources.add( + MapUtils.getString(stringObjectMap, "datasource") + ); + return druidDataSources; + } + } + ) + ); } @Override public void poll() { try { - if (!started) { + if (!lifecycleLock.isStarted()) { return; } - ConcurrentHashMap newDataSources = new ConcurrentHashMap(); + ConcurrentHashMap newDataSources = new ConcurrentHashMap<>(); log.debug("Starting polling of segment table"); @@ -525,11 +499,10 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager } } - synchronized (lock) { - if (started) { - dataSources.set(newDataSources); - } - } + ConcurrentHashMap current; + do { + current = dataSourcesRef.get(); + } while (!dataSourcesRef.compareAndSet(current, newDataSources)); } catch (Exception e) { log.makeAlert(e, "Problem polling DB.").emit(); diff --git a/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java index 3faad305e53..29dd39fe225 100644 --- a/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java @@ -32,11 +32,14 @@ import io.druid.java.util.common.StringUtils; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import java.io.IOException; + public class MetadataSegmentManagerTest { @@ -102,6 +105,14 @@ public class MetadataSegmentManagerTest publisher.publishSegment(segment2); } + @After + public void teardown() + { + if (manager.isStarted()) { + manager.stop(); + } + } + @Test public void testPoll() { @@ -115,7 +126,6 @@ public class MetadataSegmentManagerTest ImmutableSet.of(segment1, segment2), manager.getInventoryValue("wikipedia").getSegments() ); - manager.stop(); } @Test @@ -162,4 +172,62 @@ public class MetadataSegmentManagerTest manager.getUnusedSegmentIntervals("wikipedia", Intervals.of("1970/3000"), 5) ); } + + @Test + public void testRemoveDataSource() throws IOException + { + manager.start(); + manager.poll(); + + final String newDataSource = "wikipedia2"; + final DataSegment newSegment = new DataSegment( + newDataSource, + Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"), + "2017-10-15T20:19:12.565Z", + ImmutableMap.of( + "type", "s3_zip", + "bucket", "test", + "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip" + ), + ImmutableList.of("dim1", "dim2", "dim3"), + ImmutableList.of("count", "value"), + NoneShardSpec.instance(), + 0, + 1234L + ); + + publisher.publishSegment(newSegment); + + Assert.assertNull(manager.getInventoryValue(newDataSource)); + Assert.assertTrue(manager.removeDatasource(newDataSource)); + } + + @Test + public void testRemoveDataSegment() throws IOException + { + manager.start(); + manager.poll(); + + final String newDataSource = "wikipedia2"; + final DataSegment newSegment = new DataSegment( + newDataSource, + Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"), + "2017-10-15T20:19:12.565Z", + ImmutableMap.of( + "type", "s3_zip", + "bucket", "test", + "key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip" + ), + ImmutableList.of("dim1", "dim2", "dim3"), + ImmutableList.of("count", "value"), + NoneShardSpec.instance(), + 0, + 1234L + ); + + publisher.publishSegment(newSegment); + + Assert.assertNull(manager.getInventoryValue(newDataSource)); + Assert.assertTrue(manager.removeSegment(newDataSource, newSegment.getIdentifier())); + } }