RabbitMQ RPC java实现
1、搭建RabbitMQ 服务,设置相关参数,本文总线服务已提前安装,再次不在赘述相关步骤。
2、在Eclispe中创建RPC服务端项目RPCServerTest.服务端代码:RP觊皱筠桡CServer .javapackagecom.cn.trap.test.rpc;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.QueueingConsumer;importcom.rabbitmq.client.AMQP.BasicProperties;publicclassRPCServer{privatestaticfinalStringRPC_QUEUE_NAME="rpc_queue";privatestaticintfib(intn){if(n==0)return0;if(n==1)return1;returnfib(n-1)+fib(n-2);}publicstaticvoidmain(String[]argv){Connectionconnection=null;Channelchannel=null;try{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("192.168.100.17");factory.setPassword("admin");factory.setUsername("admin");factory.setPort(5677);connection=factory.newConnection();channel=connection.createChannel();channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);channel.basicQos(1);QueueingConsumerconsumer=newQueueingConsumer(channel);channel.basicConsume(RPC_QUEUE_NAME,false,consumer);System.out.println(" 等待请求-------");while(true){Stringresponse=null;QueueingConsumer.Deliverydelivery=consumer.nextDelivery();BasicPropertiesprops=delivery.getProperties();BasicPropertiesreplyProps=newBasicProperties.Builder().correlationId(props.getCorrelationId()).build();try{Stringmessage=newString(delivery.getBody(),"UTF-8");intn=Integer.parseInt(message);System.out.println("[.]fib("+message+")");response=""+fib(n);}catch(Exceptione){System.out.println("[.]"+e.toString());response="";}finally{channel.basicPublish("",props.getReplyTo(),replyProps,response.getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}}catch(Exceptione){e.printStackTrace();}finally{if(connection!=null){try{connection.close();}catch(Exceptionignore){}}}}}
3、客户端代码:RPCClient.javapackagecom.cn.trap.test.rpc;importcom.rabbitmq.艘早祓胂client.ConnectionFactory;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.QueueingConsumer;importcom.rabbitmq.client.AMQP.BasicProperties;importjava.util.UUID;publicclassRPCClient{privateConnectionconnection;privateChannelchannel;privateStringrequestQueueName="rpc_queue";privateStringreplyQueueName;privateQueueingConsumerconsumer;publicRPCClient()throwsException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost("192.168.100.17"); //RabbitMQ 服务地址factory.setPassword("admin");factory.setUsername("admin");factory.setPort(5677);connection=factory.newConnection();channel=connection.createChannel();replyQueueName=channel.queueDeclare().getQueue();consumer=newQueueingConsumer(channel);channel.basicConsume(replyQueueName,true,consumer);}publicStringcall(Stringmessage)throwsException{Stringresponse=null;StringcorrId=UUID.randomUUID().toString();BasicPropertiesprops=newBasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();channel.basicPublish("",requestQueueName,props,message.getBytes("UTF-8"));while(true){QueueingConsumer.Deliverydelivery=consumer.nextDelivery();if(delivery.getProperties().getCorrelationId().equals(corrId)){response=newString(delivery.getBody(),"UTF-8");break;}}returnresponse;}publicvoidclose()throwsException{connection.close();}publicstaticvoidmain(String[]argv){RPCClientfibonacciRpc=null;Stringresponse=null;try{fibonacciRpc=newRPCClient();System.out.println("[x]Requestingfib(30)");response=fibonacciRpc.call("30");System.out.println("[.]Got'"+response+"'");}catch(Exceptione){e.printStackTrace();}finally{if(fibonacciRpc!=null){try{fibonacciRpc.close();}catch(Exceptionignore){}}}}}