diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 8ee3a22f57d..a2ad2e773e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeer; @@ -201,7 +200,11 @@ public class ReplicationAdmin implements Closeable { public static Map> parseTableCFsFromConfig(String tableCFsConfig) { return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); } - + + public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) + throws ReplicationException { + this.replicationPeers.updatePeerConfig(id, peerConfig); + } /** * Removes a peer cluster and stops the replication to it. * @param id a short name that identifies the cluster @@ -549,6 +552,11 @@ public class ReplicationAdmin implements Closeable { } } + @VisibleForTesting + public void peerAdded(String id) throws ReplicationException { + this.replicationPeers.peerAdded(id); + } + @VisibleForTesting List listReplicationPeers() { Map peers = listPeerConfigs(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 920eea6a605..3da01fe8825 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -71,4 +71,6 @@ public interface ReplicationPeer { */ public Map> getTableCFs(); + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java new file mode 100644 index 00000000000..4e04186003e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface ReplicationPeerConfigListener { + /** Callback method for when users update the ReplicationPeerConfig for this peer + * + * @param rpc The updated ReplicationPeerConfig + */ + void peerConfigUpdated(ReplicationPeerConfig rpc); + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index f7a24114021..a33690c8ec4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -109,7 +109,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase this.readPeerConfig(); } - private void readPeerConfig() { + private ReplicationPeerConfig readPeerConfig() { try { byte[] data = peerConfigTracker.getData(false); if (data != null) { @@ -118,6 +118,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase } catch (DeserializationException e) { LOG.error("", e); } + return this.peerConfig; } @Override @@ -162,6 +163,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase return this.tableCFs; } + @Override + public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.setListener(listener); + } + } + @Override public void abort(String why, Throwable e) { LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig @@ -260,24 +268,36 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase */ public class PeerConfigTracker extends ZooKeeperNodeTracker { + ReplicationPeerConfigListener listener; + public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, Abortable abortable) { super(watcher, peerConfigNode, abortable); } - + + public synchronized void setListener(ReplicationPeerConfigListener listener){ + this.listener = listener; + } + @Override public synchronized void nodeCreated(String path) { if (path.equals(node)) { super.nodeCreated(path); - readPeerConfig(); + ReplicationPeerConfig config = readPeerConfig(); + if (listener != null){ + listener.peerConfigUpdated(config); + } } } @Override public synchronized void nodeDataChanged(String path) { + //superclass calls nodeCreated if (path.equals(node)) { super.nodeDataChanged(path); } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 1961a654436..9f70d95b313 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -151,4 +151,6 @@ public interface ReplicationPeers { * @return the configuration for the peer cluster, null if it was unable to get the configuration */ Pair getPeerConf(String peerId) throws ReplicationException; + + void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 367c6886574..916eaf84ea9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -351,6 +351,45 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return new Pair(peerConfig, otherConf); } + @Override + public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) + throws ReplicationException { + ReplicationPeer peer = getPeer(id); + if (peer == null){ + throw new ReplicationException("Could not find peer Id " + id); + } + ReplicationPeerConfig existingConfig = peer.getPeerConfig(); + if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && + !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ + throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." + + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" + + newConfig.getClusterKey() + + "'"); + } + String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); + if (newConfig.getReplicationEndpointImpl() != null && + !newConfig.getReplicationEndpointImpl().isEmpty() && + !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ + throw new ReplicationException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + existingConfig.getReplicationEndpointImpl() + + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); + } + //Update existingConfig's peer config and peer data with the new values, but don't touch config + // or data that weren't explicitly changed + existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); + existingConfig.getPeerData().putAll(newConfig.getPeerData()); + + try { + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(existingConfig)); + } + catch(KeeperException ke){ + throw new ReplicationException("There was a problem trying to save changes to the " + + "replication peer " + id, ke); + } + } + /** * List all registered peer clusters and set a watch on their znodes. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 67051abd9be..d667269dda0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractService; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; /** * A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this @@ -35,11 +38,30 @@ import com.google.common.util.concurrent.AbstractService; public abstract class BaseReplicationEndpoint extends AbstractService implements ReplicationEndpoint { + private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class); protected Context ctx; @Override public void init(Context context) throws IOException { this.ctx = context; + + if (this.ctx != null){ + ReplicationPeer peer = this.ctx.getReplicationPeer(); + if (peer != null){ + peer.trackPeerConfigChanges(this); + } else { + LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + + " because there's no such peer"); + } + } + } + + @Override + /** + * No-op implementation for subclasses to override if they wish to execute logic if their config changes + */ + public void peerConfigUpdated(ReplicationPeerConfig rpc){ + } /** Returns a default set of filters */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index ac1257fa416..c92b53db98a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -46,14 +46,13 @@ import com.google.common.util.concurrent.Service; * and persisting of the WAL entries in the other cluster. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public interface ReplicationEndpoint extends Service { +public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { private final Configuration conf; private final FileSystem fs; private final TableDescriptors tableDescriptors; - private final ReplicationPeerConfig peerConfig; private final ReplicationPeer replicationPeer; private final String peerId; private final UUID clusterId; @@ -63,13 +62,11 @@ public interface ReplicationEndpoint extends Service { public Context( final Configuration conf, final FileSystem fs, - final ReplicationPeerConfig peerConfig, final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, final MetricsSource metrics, final TableDescriptors tableDescriptors) { - this.peerConfig = peerConfig; this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -91,7 +88,7 @@ public interface ReplicationEndpoint extends Service { return peerId; } public ReplicationPeerConfig getPeerConfig() { - return peerConfig; + return replicationPeer.getPeerConfig(); } public ReplicationPeer getReplicationPeer() { return replicationPeer; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 9ff4b2d9003..83e0205c312 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -494,7 +494,7 @@ public class ReplicationSourceManager implements ReplicationListener { // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), - fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); + fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); return src; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index 3db54c6de80..2ac515ab190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -56,6 +57,11 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { delegator.init(context); } + @Override + public void peerConfigUpdated(ReplicationPeerConfig rpc){ + + } + @Override public boolean replicate(ReplicateContext replicateContext) { if (!delegator.canReplicateToSameCluster()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java index e7bd72c51ea..a56276dfd74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java @@ -18,6 +18,7 @@ import static org.junit.Assert.fail; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -195,4 +198,59 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase { adminExt.disableTableRep(tableName); } } + + @Test(timeout=300000) + public void testReplicationPeerConfigUpdateCallback() throws Exception { + String peerId = "1"; + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName()); + rpc.getConfiguration().put("key1", "value1"); + + admin.addPeer(peerId, rpc); + admin.peerAdded(peerId); + + rpc.getConfiguration().put("key1", "value2"); + admin.updatePeerConfig(peerId, rpc); + if (!TestUpdatableReplicationEndpoint.hasCalledBack()) { + synchronized(TestUpdatableReplicationEndpoint.class) { + TestUpdatableReplicationEndpoint.class.wait(2000L); + } + } + + assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack()); + } + + public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint { + private static boolean calledBack = false; + public static boolean hasCalledBack(){ + return calledBack; + } + @Override + public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){ + calledBack = true; + notifyAll(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + + + @Override + public UUID getPeerUUID() { + return UUID.randomUUID(); + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + return false; + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index f441a99fa22..e91a4f77d84 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -92,8 +92,9 @@ module Hbase table_cfs.each{|key, val| map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) } + replication_peer_config.set_table_cfs_map(map) end - @replication_admin.add_peer(id, replication_peer_config, map) + @replication_admin.add_peer(id, replication_peer_config) else raise(ArgumentError, "args must be a Hash") end @@ -202,5 +203,31 @@ module Hbase def get_peer_config(id) @replication_admin.get_peer_config(id) end + + def peer_added(id) + @replication_admin.peer_added(id) + end + + def update_peer_config(id, args={}) + # Optional parameters + config = args.fetch(CONFIG, nil) + data = args.fetch(DATA, nil) + + # Create and populate a ReplicationPeerConfig + replication_peer_config = ReplicationPeerConfig.new + unless config.nil? + replication_peer_config.get_configuration.put_all(config) + end + + unless data.nil? + # Convert Strings to Bytes for peer_data + peer_data = replication_peer_config.get_peer_data + data.each{|key, val| + peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val)) + } + end + + @replication_admin.update_peer_config(id, replication_peer_config) + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index e5c9a317e81..adcd8f289dd 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -365,6 +365,7 @@ Shell.load_command_group( disable_table_replication get_peer_config list_peer_configs + update_peer_config ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb new file mode 100644 index 00000000000..5d721fd706a --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/update_peer_config.rb @@ -0,0 +1,49 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class UpdatePeerConfig< Command + def help + return <<-EOF +A peer can either be another HBase cluster or a custom replication endpoint. In either case an id +must be specified to identify the peer. This command does not interrupt processing on an enabled replication peer. + +Two optional arguments are DATA and CONFIG which can be specified to set different values for either +the peer_data or configuration for a custom replication endpoint. Any existing values not updated by this command +are left unchanged. + +CLUSTER_KEY, REPLICATION_ENDPOINT, and TABLE_CFs cannot be updated with this command. +To update TABLE_CFs, see the append_peer_tableCFs and remove_peer_tableCFs commands. + + hbase> update_peer_config '1', DATA => { "key1" => 1 } + hbase> update_peer_config '2', CONFIG => { "config1" => "value1", "config2" => "value2" } + hbase> update_peer_config '3', DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" }, + + EOF + end + + def command(id, args = {}) + format_simple_command do + replication_admin.update_peer_config(id, args) + end + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 8f08dc0500c..0c026d6ca13 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -200,6 +200,31 @@ module Hbase replication_admin.remove_peer(peer_id_second) end + define_test "update_peer_config: can update peer config and data" do + repl_impl = "org.apache.hadoop.hbase.replication.ReplicationEndpointForTest" + config_params = { "config1" => "value1", "config2" => "value2" } + data_params = {"data1" => "value1", "data2" => "value2"} + args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params} + replication_admin.add_peer(@peer_id, args) + + #Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here we have to do it ourselves + replication_admin.peer_added(@peer_id) + + new_config_params = { "config1" => "new_value1" } + new_data_params = {"data1" => "new_value1"} + new_args = {CONFIG => new_config_params, DATA => new_data_params} + replication_admin.update_peer_config(@peer_id, new_args) + + #Make sure the updated key/value pairs in config and data were successfully updated, and that those we didn't + #update are still there and unchanged + peer_config = replication_admin.get_peer_config(@peer_id) + replication_admin.remove_peer(@peer_id) + assert_equal("new_value1", peer_config.get_configuration.get("config1")) + assert_equal("value2", peer_config.get_configuration.get("config2")) + assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1")))) + assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2")))) + + end # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279 # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below. # define_test "add_peer: adding a second peer with same id should error" do