Vertx基于EventBus发送接受自定义对象
#代码知识 发布时间: 2026-01-12
先看官方文档步骤:
需要一个编解码器,看源码:
可见内置了需要数据类型的实现,所以发送其他消息可以发送,但是如果发送自定义对象就需要自己实现编解码逻辑了
一 自定义编解码器
/**
* 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
*/
public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> {
/**
* 将消息实体封装到Buffer用于传输
* 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer
*/
@Override
public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {
final ByteArrayOutputStream b = new ByteArrayOutputStream();
try (ObjectOutputStream o = new ObjectOutputStream(b)){
o.writeObject(orderMessage);
o.close();
buffer.appendBytes(b.toByteArray());
} catch (IOException e) { e.printStackTrace(); }
}
//从Buffer中获取消息对象
@Override
public OrderMessage decodeFromWire(int pos, Buffer buffer) {
final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
OrderMessage msg = null;
try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();
} catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }
return msg;
}
//消息转换
@Override
public OrderMessage transform(OrderMessage orderMessage) {
System.out.println("消息转换---");//可对接受消息进行转换,比如转换成另一个对象等
orderMessage.setName("姚振");
return orderMessage;
}
@Override
public String name() { return "myCodec"; }
//识别是否是用户自定义编解码器,通常为-1
@Override
public byte systemCodecID() { return -1; }
public static MessageCodec create() {
return new CustomizeMessageCodec();
}
}
这里有一个点要注意,nam方法是必须的,且发送的时候一定要指明name
二 发送消息编写
public class ProducerVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
EventBus eventBus = vertx.eventBus();
//发布消息(群发)
eventBus.publish("com.hou", "群发祝福!");
//发送消息(单发),只会发送注册此地址的一个,采用不严格的轮询算法选择
DeliveryOptions options = new DeliveryOptions();//设置消息头等
options.addHeader("some-header", "some-value");
eventBus.send("com.hou", "单发消息",options,ar->{
if(ar.succeeded()) System.out.println("收到消费者确认信息:"+ar.result().body());
});
//发送自定义对象,需要编解码器
eventBus.registerCodec(CustomizeMessageCodec.create());//注册编码器
DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必须指定名字
OrderMessage orderMessage = new OrderMessage();
orderMessage.setName("侯征");
eventBus.send("com.hou", orderMessage, options1);
}
}
三 接受消息Verticle编写
public class ConsumerVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
//每个Vertx实例默认是单例
EventBus eb = vertx.eventBus();
//注册处理器,消费com.hou发送的消息
MessageConsumer<Object> consumer = eb.consumer("com.hou");//订阅地址
consumer.handler(message -> {//消息处理器
if(message.body() instanceof OrderMessage){
System.out.println("接受到对象: " + ((OrderMessage) message.body()).getName());
}
System.out.println("我是普通消费者: " + message.body());
message.reply("收到了!"); // 回复生产者,send才能接受
}).completionHandler(res -> {//注册完成后通知事件,适用于集群中比较慢的情况下
System.out.println("注册处理器结果"+res.succeeded());
});
//撤销处理器
//consumer.unregister();
}
}
四 注册部署Verticcle
vertx.deployVerticle(ConsumerVerticle.class.getName());
TimeUnit.SECONDS.sleep(1);
vertx.deployVerticle(ProducerVerticle.class.getName());
五 测试
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
代码知识SEO上一篇 : Java-JFrame窗体美化方式
下一篇 : C语言实现贪吃蛇代码
-
SEO外包最佳选择国内专业的白帽SEO机构,熟知搜索算法,各行业企业站优化策略!
SEO公司
-
可定制SEO优化套餐基于整站优化与品牌搜索展现,定制个性化营销推广方案!
SEO套餐
-
SEO入门教程多年积累SEO实战案例,从新手到专家,从入门到精通,海量的SEO学习资料!
SEO教程
-
SEO项目资源高质量SEO项目资源,稀缺性外链,优质文案代写,老域名提权,云主机相关配置折扣!
SEO资源
-
SEO快速建站快速搭建符合搜索引擎友好的企业网站,协助备案,域名选择,服务器配置等相关服务!
SEO建站
-
快速搜索引擎优化建议没有任何SEO机构,可以承诺搜索引擎排名的具体位置,如果有,那么请您多注意!专业的SEO机构,一般情况下只能确保目标关键词进入到首页或者前几页,如果您有相关问题,欢迎咨询!