RabbitMQ RPC java实现

2025-12-21 01:46:13

1、搭建RabbitMQ 服务,设置相关参数,本文总线服务已提前安装,再次不在赘述相关步骤。

RabbitMQ RPC java实现

2、在Eclispe中创建RPC服务端项目RPCServerTest.

服务端代码:RPCServer .java

package com.cn.trap.test.rpc;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {

  private static final String RPC_QUEUE_NAME = "rpc_queue";

  private static int fib(int n) {

    if (n ==0) return 0;

    if (n == 1) return 1;

    return fib(n-1) + fib(n-2);

  }

  public static void main(String[] argv) {

    Connection connection = null;

    Channel channel = null;

    try {

      ConnectionFactory factory = new ConnectionFactory();

      factory.setHost("192.168.100.17");

      factory.setPassword("admin");

      factory.setUsername("admin");

      factory.setPort(5677);

      connection = factory.newConnection();

      channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      QueueingConsumer consumer = new QueueingConsumer(channel);

      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

      System.out.println(" 等待请求-------");

      while (true) {

        String response = null;

        QueueingConsumer.Delivery delivery = consumer.nextDelivery();

        BasicProperties props = delivery.getProperties();

        BasicProperties replyProps = new BasicProperties

                                         .Builder()

                                         .correlationId(props.getCorrelationId())

                                         .build();

        try {

          String message = new String(delivery.getBody(),"UTF-8");

          int n = Integer.parseInt(message);

          System.out.println(" [.] fib(" + message + ")");

          response = "" + fib(n);

        }

        catch (Exception e){

          System.out.println(" [.] " + e.toString());

          response = "";

        }

        finally {  

          channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));

          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }

      }

    }

    catch  (Exception e) {

      e.printStackTrace();

    }

    finally {

      if (connection != null) {

        try {

          connection.close();

        }

        catch (Exception ignore) {}

      }

    }                    

  }

}

RabbitMQ RPC java实现

RabbitMQ RPC java实现

3、客户端代码:RPCClient.java

package com.cn.trap.test.rpc;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.AMQP.BasicProperties;

import java.util.UUID;

public class RPCClient {

  private Connection connection;

  private Channel channel;

  private String requestQueueName = "rpc_queue";

  private String replyQueueName;

  private QueueingConsumer consumer;

  public RPCClient() throws Exception {

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("192.168.100.17");   //RabbitMQ 服务地址

    factory.setPassword("admin");

    factory.setUsername("admin");

    factory.setPort(5677);

    connection = factory.newConnection();

    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();

    consumer = new QueueingConsumer(channel);

    channel.basicConsume(replyQueueName, true, consumer);

  }

  public String call(String message) throws Exception {

    String response = null;

    String corrId = UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties

                                .Builder()

                                .correlationId(corrId)

                                .replyTo(replyQueueName)

                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

    while (true) {

      QueueingConsumer.Delivery delivery = consumer.nextDelivery();

      if (delivery.getProperties().getCorrelationId().equals(corrId)) {

        response = new String(delivery.getBody(),"UTF-8");

        break;

      }

    }

    return response;

  }

  public void close() throws Exception {

    connection.close();

  }

  public static void main(String[] argv) {

    RPCClient fibonacciRpc = null;

    String response = null;

    try {

      fibonacciRpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");

      response = fibonacciRpc.call("30");

      System.out.println(" [.] Got '" + response + "'");

    }

    catch  (Exception e) {

      e.printStackTrace();

    }

    finally {

      if (fibonacciRpc!= null) {

        try {

          fibonacciRpc.close();

        }

        catch (Exception ignore) {}

      }

    }

  }

}

声明:本网站引用、摘录或转载内容仅供网站访问者交流或参考,不代表本站立场,如存在版权或非法内容,请联系站长删除,联系邮箱:site.kefu@qq.com。
相关推荐
  • 阅读量:186
  • 阅读量:28
  • 阅读量:164
  • 阅读量:49
  • 阅读量:41
  • 猜你喜欢