StormDRPC 概念以及简单例子测试
Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。
DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern)。
Storm DRPC工作机制
Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)。
DRPC服务器协调
1) 接收一个RPC请求。
2) 发送请求到storm topology
3) 从storm topology接收结果。
4) 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。比如下面是客户端如何调用RPC:exclaimation方法的,方法的参数是: hello
DRPCClient client = new DRPCClient(“drpc-host”, 3772); String result = client.execute(“exclaimation”, “hello”);
|
DRPC的工作流大致是这样的:
客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
LinearDRPCTopologyBuilder
Storm自带了一个称作LinearDRPCTopologyBuilder的topology builder, 它把实现DRPC的几乎所有步骤都自动化了,包括:
设置spout
把结果返回给DRPC服务器
代码示例:
Blot:
package com.billstudy.storm.drpc; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 将用户输入的字符串,加1后输出 * @author Bill * @since V1.0 2015年7月7日 - 上午10:47:59 */ public class DRPCBlotOf1 extends BaseBasicBolt{ private static final long serialVersionUID = 1L; /** * 往下走的时候,需要把DRPC的元数据带上。 * 在tupule的第一个元素中 */ @Override public void execute(Tuple input, BasicOutputCollector collector) { Object drpcMeta = input.getValue(0); // 元数据 String clientData = input.getString(1); // 客户端传递来的数据 collector.emit(new Values(drpcMeta,clientData + "1")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","result")); } }
Driver:
package com.billstudy.storm.drpc; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; /** * DPRC Driver * @author Bill * @since V1.0 2015年7月7日 - 上午10:55:52 */ public class DRPCMain { @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception{ // 使用DRPC LinearDRPCTopologyBuilder drpcTopologyBuilder = new LinearDRPCTopologyBuilder("drpc-test"); drpcTopologyBuilder.addBolt(new DRPCBlotOf1(),3); Config config = new Config(); if(args == null || args.length == 0){ LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", config, drpcTopologyBuilder.createRemoteTopology()); for (String word : new String[]{"hello","world","rpc"}) { System.out.println("result : " + word + " : " + drpc.execute("drpc-test", word)); } }else { config.setNumWorkers(3); StormSubmitter.submitTopology(args[0], config, drpcTopologyBuilder.createRemoteTopology()); } } }
# with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ########### These MUST be filled in for a storm configuration storm.zookeeper.servers: - "v5" - "v6" - "v7" # nimbus.host: "v5" #supervisor.slots.ports #-8801 #-8802 #-8803 #-8804 #-8805 #-8806 # # # ##### These may optionally be filled in: # ## List of custom serializations # topology.kryo.register: # - org.mycompany.MyType # - org.mycompany.MyType2: org.mycompany.MyType2Serializer # ## List of custom kryo decorators # topology.kryo.decorators: # - org.mycompany.MyDecorator # ## Locations of the drpc servers drpc.servers: - "v5" # - "server2"
然后配置文件分发各个子节点,启动nimbus,drpc,supervisor就可以测试了
scp /root/storm/conf/storm.yaml v6:$PWD scp /root/storm/conf/storm.yaml v7:$PWD storm nimbus & storm drpc & storm supervisor & #这句需要在v6,v7上面执行,上面两句在v5上面执行就好了 #提交打好的jar storm jar DRPC.jar com.billstudy.storm.drpc.DRPCMain drpc-test-task #可以到v5:8080上面看到任务提交成功了,就可以启动DRPCClient测试了
版权声明:本文为博主原创文章,未经博主允许不得转载。