StormDRPC 概念以及简单测试

StormDRPC 概念以及简单测试

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPCstorm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。

 

DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spoutbolt, topology而成的一种模式(pattern)


Storm DRPC工作机制

Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)

DRPC服务器协调

1) 接收一个RPC请求。

2) 发送请求到storm topology 

3) storm topology接收结果。

4) 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。比如下面是客户端如何调用RPCexclaimation方法的,方法的参数是: hello

 

DRPCClient client = new DRPCClient(“drpc-host”, 3772);

String result = client.execute(“exclaimation“, “hello“);

 

 

DRPC的工作流大致是这样的:

 

 

客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpoutDRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResultsbolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

 

LinearDRPCTopologyBuilder

 

Storm自带了一个称作LinearDRPCTopologyBuildertopology builder, 它把实现DRPC的几乎所有步骤都自动化了,包括:

设置spout

把结果返回给DRPC服务器

代码示例:

Blot:

  1. package com.billstudy.storm.drpc;
  2. import backtype.storm.topology.BasicOutputCollector;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseBasicBolt;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. /**
  9. * 将用户输入的字符串,加1后输出
  10. * @author Bill
  11. * @since V1.0 2015年7月7日 - 上午10:47:59
  12. */
  13. public class DRPCBlotOf1 extends BaseBasicBolt{
  14. private static final long serialVersionUID = 1L;
  15. /**
  16. * 往下走的时候,需要把DRPC的元数据带上。
  17. * 在tupule的第一个元素中
  18. */
  19. @Override
  20. public void execute(Tuple input, BasicOutputCollector collector) {
  21. Object drpcMeta = input.getValue(0); // 元数据
  22. String clientData = input.getString(1); // 客户端传递来的数据
  23. collector.emit(new Values(drpcMeta,clientData + "1"));
  24. }
  25. @Override
  26. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  27. declarer.declare(new Fields("id","result"));
  28. }
  29. }
Driver:
  1. package com.billstudy.storm.drpc;
  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.LocalDRPC;
  5. import backtype.storm.StormSubmitter;
  6. import backtype.storm.drpc.LinearDRPCTopologyBuilder;
  7. /**
  8. * DPRC Driver
  9. * @author Bill
  10. * @since V1.0 2015年7月7日 - 上午10:55:52
  11. */
  12. public class DRPCMain {
  13. @SuppressWarnings("deprecation")
  14. public static void main(String[] args) throws Exception{
  15. // 使用DRPC
  16. LinearDRPCTopologyBuilder drpcTopologyBuilder = new LinearDRPCTopologyBuilder("drpc-test");
  17. drpcTopologyBuilder.addBolt(new DRPCBlotOf1(),3);
  18. Config config = new Config();
  19. if(args == null || args.length == 0){
  20. LocalDRPC drpc = new LocalDRPC();
  21. LocalCluster cluster = new LocalCluster();
  22. cluster.submitTopology("drpc-demo", config, drpcTopologyBuilder.createRemoteTopology());
  23. for (String word : new String[]{"hello","world","rpc"}) {
  24. System.out.println("result : " + word + " : " + drpc.execute("drpc-test", word));
  25. }
  26. }else {
  27. config.setNumWorkers(3);
  28. StormSubmitter.submitTopology(args[0], config, drpcTopologyBuilder.createRemoteTopology());
  29. }
  30. }
  31. }
Storm集群需要配置以下:
注意drpc的部分
  1. # with the License. You may obtain a copy of the License at
  2. #
  3. # http://www.apache.org/licenses/LICENSE-2.0
  4. #
  5. # Unless required by applicable law or agreed to in writing, software
  6. # distributed under the License is distributed on an "AS IS" BASIS,
  7. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  8. # See the License for the specific language governing permissions and
  9. # limitations under the License.
  10. ########### These MUST be filled in for a storm configuration
  11. storm.zookeeper.servers:
  12. - "v5"
  13. - "v6"
  14. - "v7"
  15. #
  16. nimbus.host: "v5"
  17. #supervisor.slots.ports
  18. #-8801
  19. #-8802
  20. #-8803
  21. #-8804
  22. #-8805
  23. #-8806
  24. #
  25. #
  26. # ##### These may optionally be filled in:
  27. #
  28. ## List of custom serializations
  29. # topology.kryo.register:
  30. # - org.mycompany.MyType
  31. # - org.mycompany.MyType2: org.mycompany.MyType2Serializer
  32. #
  33. ## List of custom kryo decorators
  34. # topology.kryo.decorators:
  35. # - org.mycompany.MyDecorator
  36. #
  37. ## Locations of the drpc servers
  38. drpc.servers:
  39. - "v5"
  40. # - "server2"
然后配置文件分发各个子节点,启动nimbus,drpc,supervisor就可以测试了
  1. scp /root/storm/conf/storm.yaml v6:$PWD
  2. scp /root/storm/conf/storm.yaml v7:$PWD
  3. storm nimbus &
  4. storm drpc &
  5. storm supervisor & #这句需要在v6,v7上面执行,上面两句在v5上面执行就好了
  6. #提交打好的jar
  7. storm jar DRPC.jar com.billstudy.storm.drpc.DRPCMain drpc-test-task
  8. #可以到v5:8080上面看到任务提交成功了,就可以启动DRPCClient测试了
DRPCClient程序 通过Storm drpc服务监听的3772端口进行通讯
execute执行有两个参数:
    1:drpc-test这是创建LinearDRPCTopologyBuilder时指定的名称
  2: Bill 这是传入的参数

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注