From 306f4b8fa2cb8f976bbfe128228f0f75aa30f673 Mon Sep 17 00:00:00 2001 From: fjy Date: Wed, 8 Jan 2014 14:08:41 -0800 Subject: [PATCH] more tests --- .../server/bridge/DruidClusterBridgeTest.java | 230 ++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java new file mode 100644 index 00000000000..a8d9dd69b55 --- /dev/null +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -0,0 +1,230 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.bridge; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.common.lifecycle.Lifecycle; +import io.druid.client.BatchServerInventoryView; +import io.druid.client.DruidServer; +import io.druid.client.ServerView; +import io.druid.curator.PotentiallyGzippedCompressionProvider; +import io.druid.curator.announcement.Announcer; +import io.druid.db.DatabaseSegmentManager; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.realtime.DbSegmentPublisher; +import io.druid.server.DruidNode; +import io.druid.server.coordination.BatchDataSegmentAnnouncer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.easymock.EasyMock; +import org.joda.time.Duration; +import org.junit.Test; + +import java.util.Arrays; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class DruidClusterBridgeTest +{ + @Test + public void testRun() throws Exception + { + TestingCluster localCluster = new TestingCluster(1); + localCluster.start(); + + CuratorFramework localCf = CuratorFrameworkFactory.builder() + .connectString(localCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider( + new PotentiallyGzippedCompressionProvider( + false + ) + ) + .build(); + localCf.start(); + + + TestingCluster remoteCluster = new TestingCluster(1); + remoteCluster.start(); + + CuratorFramework remoteCf = CuratorFrameworkFactory.builder() + .connectString(remoteCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider( + new PotentiallyGzippedCompressionProvider( + false + ) + ) + .build(); + remoteCf.start(); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + DruidClusterBridgeConfig config = new DruidClusterBridgeConfig() + { + @Override + public String getTier() + { + return DruidServer.DEFAULT_TIER; + } + + @Override + public Duration getStartDelay() + { + return new Duration(0); + } + + @Override + public Duration getPeriod() + { + return new Duration(Long.MAX_VALUE); + } + + @Override + public String getBrokerServiceName() + { + return "testz0rz"; + } + + @Override + public int getPriority() + { + return 0; + } + }; + + ScheduledExecutorFactory factory = ScheduledExecutors.createFactory(new Lifecycle()); + + DruidNode me = new DruidNode( + "me", + "localhost", + 8080 + ); + + AtomicReference leaderLatch = new AtomicReference<>(new LeaderLatch(localCf, "test")); + + ZkPathsConfig zkPathsConfig = new ZkPathsConfig() + { + @Override + public String getZkBasePath() + { + return "/druid"; + } + }; + DruidServerMetadata metadata = new DruidServerMetadata( + "test", + "localhost", + 1000, + "bridge", + DruidServer.DEFAULT_TIER, + 0 + ); + DbSegmentPublisher dbSegmentPublisher = EasyMock.createMock(DbSegmentPublisher.class); + DatabaseSegmentManager databaseSegmentManager = EasyMock.createMock(DatabaseSegmentManager.class); + ServerView serverView = EasyMock.createMock(ServerView.class); + BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator( + jsonMapper, + zkPathsConfig, + metadata, + remoteCf, + dbSegmentPublisher, + databaseSegmentManager, + serverView + ); + + Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor()); + announcer.start(); + announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me)); + BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class); + BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class); + + EasyMock.expect(batchServerInventoryView.getInventory()).andReturn( + Arrays.asList( + new DruidServer("1", "localhost", 117, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("2", "localhost", 1, "historical", DruidServer.DEFAULT_TIER, 0) + ) + ); + batchServerInventoryView.registerSegmentCallback( + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall(); + batchServerInventoryView.start(); + EasyMock.expectLastCall(); + EasyMock.replay(batchServerInventoryView); + + + DruidClusterBridge bridge = new DruidClusterBridge( + jsonMapper, + config, + factory, + me, + localCf, + leaderLatch, + bridgeZkCoordinator, + announcer, + batchDataSegmentAnnouncer, + batchServerInventoryView + ); + + bridge.start(); + + int retry = 0; + while (!bridge.isLeader()) { + if (retry > 5) { + throw new ISE("Unable to become leader"); + } + + Thread.sleep(100); + retry++; + } + + String path = "/druid/announcements/localhost:8080"; + retry = 0; + while (remoteCf.checkExists().forPath(path) == null) { + if (retry > 5) { + throw new ISE("Unable to announce"); + } + + Thread.sleep(100); + retry++; + } + + DruidServerMetadata announced = jsonMapper.readValue( + remoteCf.getData().forPath(path), + DruidServerMetadata.class + ); + + Assert.assertEquals(118, announced.getMaxSize()); + + EasyMock.verify(batchServerInventoryView); + } +}