Fix SQLMetadataSegmentManager (#5001)

This commit is contained in:
Jihoon Son 2017-11-01 00:02:41 +09:00 committed by Gian Merlino
parent b7fc1424dd
commit e96daa2593
4 changed files with 169 additions and 111 deletions

View File

@ -115,6 +115,11 @@ public final class LifecycleLock
}
}
boolean isStarted()
{
return getState() == START_EXITED_SUCCESSFUL;
}
boolean awaitStarted()
{
try {
@ -124,7 +129,7 @@ public final class LifecycleLock
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getState() == START_EXITED_SUCCESSFUL;
return isStarted();
}
boolean awaitStarted(long timeNanos)
@ -138,7 +143,7 @@ public final class LifecycleLock
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getState() == START_EXITED_SUCCESSFUL;
return isStarted();
}
@Override
@ -210,6 +215,15 @@ public final class LifecycleLock
sync.exitStart();
}
/**
* Returns {@code true} if {@link #started()} was called before that. Returns {@code false} if {@link #started()} is
* not called before {@link #exitStart()}, or if {@link #canStop()} is already called on this LifecycleLock.
*/
public boolean isStarted()
{
return sync.isStarted();
}
/**
* Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called
* before that. Returns {@code false} if {@link #started()} is not called before {@link #exitStart()}, or if {@link

View File

@ -379,6 +379,9 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycleLock.exitStop();
}
}
@Override

View File

@ -35,12 +35,13 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidDataSource;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.timeline.DataSegment;
@ -82,20 +83,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
private final Object lock = new Object();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSourcesRef;
private final SQLMetadataConnector connector;
private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;
private volatile boolean started = false;
@Inject
public SQLMetadataSegmentManager(
ObjectMapper jsonMapper,
@ -107,7 +105,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
this.jsonMapper = jsonMapper;
this.config = config;
this.dbTables = dbTables;
this.dataSources = new AtomicReference<>(
this.dataSourcesRef = new AtomicReference<>(
new ConcurrentHashMap<String, DruidDataSource>()
);
this.connector = connector;
@ -117,11 +115,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
if (!lifecycleLock.canStart()) {
return;
}
try {
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
final Duration delay = config.get().getPollDuration().toStandardDuration();
@ -144,7 +142,10 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
delay.getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
@ -152,18 +153,24 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
if (!lifecycleLock.canStop()) {
return;
}
try {
final ConcurrentHashMap<String, DruidDataSource> emptyMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, DruidDataSource> current;
do {
current = dataSourcesRef.get();
} while (!dataSourcesRef.compareAndSet(current, emptyMap));
started = false;
dataSources.set(new ConcurrentHashMap<String, DruidDataSource>());
future.cancel(false);
future = null;
exec.shutdownNow();
exec = null;
}
finally {
lifecycleLock.exitStop();
}
}
@Override
@ -302,31 +309,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public boolean removeDatasource(final String ds)
{
try {
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
if (!dataSourceMap.containsKey(ds)) {
log.warn("Cannot delete datasource %s, does not exist", ds);
return false;
}
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
)
.bind("dataSource", ds)
.execute();
return null;
}
}
final int removed = connector.getDBI().withHandle(
handle -> handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
).bind("dataSource", ds).execute()
);
dataSourceMap.remove(ds);
dataSourcesRef.get().remove(ds);
if (removed == 0) {
return false;
}
}
catch (Exception e) {
log.error(e, "Error removing datasource %s", ds);
@ -340,34 +333,25 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public boolean removeSegment(String ds, final String segmentID)
{
try {
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())
).bind("segmentID", segmentID)
.execute();
return null;
}
}
final int removed = connector.getDBI().withHandle(
handle -> handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())
).bind("segmentID", segmentID).execute()
);
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
if (!dataSourceMap.containsKey(ds)) {
log.warn("Cannot find datasource %s", ds);
return false;
}
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSourcesRef.get();
DruidDataSource dataSource = dataSourceMap.get(ds);
dataSource.removePartition(segmentID);
if (dataSource != null) {
dataSource.removePartition(segmentID);
if (dataSource.isEmpty()) {
dataSourceMap.remove(ds);
if (dataSource.isEmpty()) {
dataSourceMap.remove(ds);
}
}
if (removed == 0) {
return false;
}
}
catch (Exception e) {
@ -381,69 +365,59 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@Override
public boolean isStarted()
{
return started;
return lifecycleLock.isStarted();
}
@Override
public DruidDataSource getInventoryValue(String key)
{
return dataSources.get().get(key);
return dataSourcesRef.get().get(key);
}
@Override
public Collection<DruidDataSource> getInventory()
{
return dataSources.get().values();
return dataSourcesRef.get().values();
}
@Override
public Collection<String> getAllDatasourceNames()
{
synchronized (lock) {
return connector.getDBI().withHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
)
.fold(
Lists.<String>newArrayList(),
new Folder3<ArrayList<String>, Map<String, Object>>()
{
@Override
public ArrayList<String> fold(
ArrayList<String> druidDataSources,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
druidDataSources.add(
MapUtils.getString(stringObjectMap, "datasource")
);
return druidDataSources;
}
}
);
}
}
);
}
return connector.getDBI().withHandle(
handle -> handle.createQuery(
StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
)
.fold(
new ArrayList<>(),
new Folder3<List<String>, Map<String, Object>>()
{
@Override
public List<String> fold(
List<String> druidDataSources,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
druidDataSources.add(
MapUtils.getString(stringObjectMap, "datasource")
);
return druidDataSources;
}
}
)
);
}
@Override
public void poll()
{
try {
if (!started) {
if (!lifecycleLock.isStarted()) {
return;
}
ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<String, DruidDataSource>();
ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<>();
log.debug("Starting polling of segment table");
@ -525,11 +499,10 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
}
}
synchronized (lock) {
if (started) {
dataSources.set(newDataSources);
}
}
ConcurrentHashMap<String, DruidDataSource> current;
do {
current = dataSourcesRef.get();
} while (!dataSourcesRef.compareAndSet(current, newDataSources));
}
catch (Exception e) {
log.makeAlert(e, "Problem polling DB.").emit();

View File

@ -32,11 +32,14 @@ import io.druid.java.util.common.StringUtils;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
public class MetadataSegmentManagerTest
{
@ -102,6 +105,14 @@ public class MetadataSegmentManagerTest
publisher.publishSegment(segment2);
}
@After
public void teardown()
{
if (manager.isStarted()) {
manager.stop();
}
}
@Test
public void testPoll()
{
@ -115,7 +126,6 @@ public class MetadataSegmentManagerTest
ImmutableSet.of(segment1, segment2),
manager.getInventoryValue("wikipedia").getSegments()
);
manager.stop();
}
@Test
@ -162,4 +172,62 @@ public class MetadataSegmentManagerTest
manager.getUnusedSegmentIntervals("wikipedia", Intervals.of("1970/3000"), 5)
);
}
@Test
public void testRemoveDataSource() throws IOException
{
manager.start();
manager.poll();
final String newDataSource = "wikipedia2";
final DataSegment newSegment = new DataSegment(
newDataSource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publisher.publishSegment(newSegment);
Assert.assertNull(manager.getInventoryValue(newDataSource));
Assert.assertTrue(manager.removeDatasource(newDataSource));
}
@Test
public void testRemoveDataSegment() throws IOException
{
manager.start();
manager.poll();
final String newDataSource = "wikipedia2";
final DataSegment newSegment = new DataSegment(
newDataSource,
Intervals.of("2017-10-15T00:00:00.000/2017-10-16T00:00:00.000"),
"2017-10-15T20:19:12.565Z",
ImmutableMap.of(
"type", "s3_zip",
"bucket", "test",
"key", "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip"
),
ImmutableList.of("dim1", "dim2", "dim3"),
ImmutableList.of("count", "value"),
NoneShardSpec.instance(),
0,
1234L
);
publisher.publishSegment(newSegment);
Assert.assertNull(manager.getInventoryValue(newDataSource));
Assert.assertTrue(manager.removeSegment(newDataSource, newSegment.getIdentifier()));
}
}