在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:rpush开源软件地址:https://gitee.com/shuangmulin/rpush开源软件介绍:rpush:多平台统一消息推送系统
在线体验http://159.75.121.163/admin admin 目前支持的消息类型
本地如何快速部署一个简单的体验版本地数据库执行
启动完成之后,浏览器打开 效果展示单个消息类型发送示例web端多平台发送示例postman多平台发送示例用代码发消息秉持”业务服务只负责发消息“的解耦原则,业务服务在需要发消息的时候,代码应该越简单越好。所以,Rpush的发消息的sdk,一种消息只需要一行代码,有几种消息就有几行代码。比如这样: /** * @author shuangmulin * @since 2021/6/8/008 11:37 **/public class RpushSenderTest { /** * 要发送的内容 */ public static final String content = "您的会议室已经预定 \n" + ">**事项详情** \n" + ">事 项:开会\n" + ">组织者:@miglioguan \n" + ">参与者:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" + "> \n" + ">会议室:广州TIT 1楼 301\n" + ">日 期:2021年5月18日\n" + ">时 间:上午9:00-11:00\n" + "> \n" + ">请准时参加会议。 \n" + "> \n" + ">如需修改会议信息,请点击:[修改会议信息](https://work.weixin.qq.com)"; public static void main(String[] args) { // 企业微信-markdown消息 MarkdownMessageDTO markdown = RpushMessage.WECHAT_WORK_AGENT_MARKDOWN().content(content).receiverIds(Collections.singletonList("ZhongBaoLin")).build(); // 企业微信-群机器人消息 TextMessageDTO text = RpushMessage.WECHAT_WORK_ROBOT_TEXT().content(content).receiverIds(Collections.singletonList("ZhongBaoLin")).build(); // 邮箱 EmailMessageDTO email = RpushMessage.EMAIL().title("会议通知").content(content).build(); RpushService.instance("baolin", "666666").sendMessage(markdown, text, email); // 填上账号密码,运行即可 }} 以上代码,一次发送了三种不同平台的不同消息类型,全部代码加起来也只需要四五行代码而已。要获得以上效果,只需要maven引用rpush的sdk模块即可: <project> <!-- 设置 jitpack.io 仓库 --> <repositories> <repository> <id>jitpack.io</id> <url>https://jitpack.io</url> </repository> </repositories> <dependencies> <!-- 添加rpush-sdk依赖 --> <dependency> <groupId>com.github.shuangmulin.rpush</groupId> <artifactId>rpush-sdk</artifactId> <version>v1.0.2</version> </dependency> </dependencies></project> 即时通讯Rpush对即时通讯的实现方式比较 一些比较核心的扩展点1. 可自由扩展的消息平台和消息类型在Rpush的设计里,消息被归类为“消息平台”和“消息类型”,分别对应如下两个枚举: /** * 消息平台枚举 **/public enum MessagePlatformEnum { EMAIL(EmailConfig.class, "邮箱", "", "^[_a-z0-9-]+(\\.[_a-z0-9-]+)*@[a-z0-9-]+(\\.[a-z0-9-]+)*(\\.[a-z]{2,})$", true), WECHAT_WORK_AGENT(WechatWorkAgentConfig.class, "企业微信-应用消息", "", "", true), WECHAT_WORK_ROBOT(WechatWorkRobotConfig.class, "企业微信-群机器人", "", "", true), WECHAT_OFFICIAL_ACCOUNT(WechatOfficialAccountConfig.class, "微信公众号", "", "", true), DING_TALK_CORP(DingTalkCorpConfig.class, "钉钉-工作通知", "", "", true), RPUSH_SERVER(EmptyConfig.class, "rpush服务", "", "", true);}/** * 消息类型枚举 **/public enum MessageType { EMAIL("普通邮件 ", MessagePlatformEnum.EMAIL), RPUSH_SERVER("文本", MessagePlatformEnum.RPUSH_SERVER), // ================================企业微信-应用==================================== WECHAT_WORK_AGENT_TEXT("文本", MessagePlatformEnum.WECHAT_WORK_AGENT), WECHAT_WORK_AGENT_IMAGE("图片", MessagePlatformEnum.WECHAT_WORK_AGENT), WECHAT_WORK_AGENT_VIDEO("视频", MessagePlatformEnum.WECHAT_WORK_AGENT), WECHAT_WORK_AGENT_FILE("文件", MessagePlatformEnum.WECHAT_WORK_AGENT), WECHAT_WORK_AGENT_TEXTCARD("文本卡片", MessagePlatformEnum.WECHAT_WORK_AGENT), WECHAT_WORK_AGENT_NEWS("图文消息", MessagePlatformEnum.WECHAT_WORK_AGENT), WECHAT_WORK_AGENT_MARKDOWN("Markdown", MessagePlatformEnum.WECHAT_WORK_AGENT), // ================================企业微信-群机器人==================================== WECHAT_WORK_ROBOT_TEXT("文本", MessagePlatformEnum.WECHAT_WORK_ROBOT), WECHAT_WORK_ROBOT_IMAGE("图片", MessagePlatformEnum.WECHAT_WORK_ROBOT), WECHAT_WORK_ROBOT_NEWS("图文消息", MessagePlatformEnum.WECHAT_WORK_ROBOT), WECHAT_WORK_ROBOT_MARKDOWN("Markdown", MessagePlatformEnum.WECHAT_WORK_ROBOT), // ================================微信公众号==================================== WECHAT_OFFICIAL_ACCOUNT_TEXT("文本", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT), WECHAT_OFFICIAL_ACCOUNT_NEWS("图文消息", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT), WECHAT_OFFICIAL_ACCOUNT_TEMPLATE("模板消息", MessagePlatformEnum.WECHAT_OFFICIAL_ACCOUNT), // ================================钉钉-工作通知==================================== DING_TALK_COPR_TEXT("文本", MessagePlatformEnum.DING_TALK_CORP), DING_TALK_COPR_MARKDOWN("Markdown", MessagePlatformEnum.DING_TALK_CORP), DING_TALK_COPR_LINK("链接消息", MessagePlatformEnum.DING_TALK_CORP), DING_TALK_COPR_ACTION_CARD_SINGLE("卡片-单按钮", MessagePlatformEnum.DING_TALK_CORP), DING_TALK_COPR_ACTION_CARD_MULTI("卡片-多按钮", MessagePlatformEnum.DING_TALK_CORP), DING_TALK_COPR_OA("OA消息", MessagePlatformEnum.DING_TALK_CORP), ;} 这里拿“企业微信-应用的文本类型”的消息举例。假设现在要在Rpush实现这个类型的消息,步骤如下:
/** * 企业微信配置 **/@EqualsAndHashCode(callSuper = true)@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic class WechatWorkAgentConfig extends Config { private static final long serialVersionUID = -9206902816158196669L; @ConfigValue(value = "企业ID", description = "在此页面查看:https://work.weixin.qq.com/wework_admin/frame#profile") private String corpId; @ConfigValue(value = "应用Secret") private String secret; @ConfigValue(value = "应用agentId") private Integer agentId;} 里面的字段就按对应平台需要的字段去定义就行,比如这里的企业微信就只有三个字段需要配置。而每个字段上的
/** * 企业微信消息发送DTO **/@EqualsAndHashCode(callSuper = true)@Data@SuperBuilder@NoArgsConstructor@AllArgsConstructorpublic class TextMessageDTO extends BaseMessage { private static final long serialVersionUID = -3289428483627765265L; /** * 接收人分组列表 */ @SchemeValue(type = SchemeValueType.RECEIVER_GROUP) private List<Long> receiverGroupIds; /** * 接收人列表 */ @SchemeValue(type = SchemeValueType.RECEIVER) private List<String> receiverIds; @SchemeValue(description = "PartyID列表,非必填,多个接受者用‘|’分隔。当touser为@all时忽略本参数") private String toParty; @SchemeValue(description = "TagID列表,非必填,多个接受者用‘|’分隔。当touser为@all时忽略本参数") private String toTag; @SchemeValue(type = SchemeValueType.TEXTAREA, description = "请输入内容...") private String content;} 同样的,里面的字段根据该消息类型需要的字段去定义就行。比如企业微信-应用-文本消息就只需要一个
/** * 企业微信文本消息handler **/@Componentpublic class AgentTextMessageHandler extends MessageHandler<TextMessageDTO> { @Override public MessageType messageType() { return MessageType.WECHAT_WORK_AGENT_TEXT; } @Override public void handle(TextMessageDTO param) { // 具体的发消息代码 }} 这里有以下需要关心的点:
到这里,就不需要多做任何其它的事了。也就是说,做完以上四个步骤,就已经完成了一个消息类型的扩展。事做的少,获得的功能并不少:
2. 可自由扩展的即时通讯实现在rpush的架构里,投递一个消息的流程大致可以概括为:调用统一的接口向路由服务投递消息-》路由服务查出消息目标所在的服务器地址-》路由服务向对应的服务器传递消息-》对应的服务找到对应的会话发送消息。这里的扩展点在最后一步,即用户和服务器的会话维护。要实现服务端向客户端推送消息,会有比较多的解决方案,比如用netty起一个nio服务器,客户端去连netty服务器或者服务端用socketio提供websocket实现,客户端按websocket的方式连服务器或者用comet实现长连接让客户端连等等。
RpushClient不管是什么技术实现的服务端推送,都会有一个“客户端”性质的类,比如netty会有 /** * 客户端 **/public interface RpushClient { /** * 推送消息 */ void pushMessage(NormalMessageDTO message); void close();} 只要实现了这个接口的类,不管是什么技术的实现,都被认为是rpush的客户端。也就是说,netty也好,websocket也好,只要提供给rpush这个接口的能力即可,从而达到解耦具体实现的目的。目前rpush已经做了netty和socketio两个实现,分别对应 netty客户端sdkrpush提供了netty对应的客户端的sdk,项目依赖 public class Main { public static void main(String[] args) { RpushClient rpushClient = new RpushClient(servicePath, registrationId); // 填上rpush服务地址和id rpushClient.addMsgProcessor(new PingIgnoreMsgProcessor()); // 忽略心跳消息 rpushClient.start(); // 向服务端发起连接 rpushClient.addMsgProcessor(msg -> { // 处理接收到的消息 return false; }); }} 关于架构rpush目前主要提供两大功能,一个是消息分发,另一个是即时通讯功能。消息分发由路由服务 1. 可自由集群的路由服务为了保证消息投递的统一性以及解耦消息分发和即时通讯之间的关系,路由服务只做一件事,即 2. 可自由集群的socket服务
客户端基于以上步骤上线之后,其他客户端向该客户端投递消息的流程为:
实现以上流程之后,socket服务就可以做到自由集群了。 上面说的流程偏理论化,有几个技术实现点这里做一下详细说明:
实现的手段其实非常的简单暴力。首先由socket服务提供一个查询本机ip和端口的接口,路由服务直接通过ribbon去请求这个接口,然后自定义一个负载均衡规则类,来实现socket服务的选择: /** * 路由->Socket服务端请求的实例选择 */public class ServerBalancer extends ZoneAvoidanceRule { @Override public Server choose(Object o) { // ... // 用默认的负载均衡算法选出一个可用的socket服务(这里的算法可以根据实际业务更改) return super.choose(o); // ... }} 在配置文件里配置这个“规则类”: rpush-server: ribbon: NFLoadBalancerRuleClassName: com.regent.rpush.route.loadbalancer.ServerBalancer 路由服务在向socket服务请求的时候会”经过“这个”规则类“,然后由这个“规则类”来选出一个可用的socket服务。最终socket服务的端口和ip信息,也是由选中的socket服务通过这次请求返回给路由服务的。当然这个规则类不是只做这一件事,还有一个问题也需要这个类来完成。
首先,在客户端与某一个socket服务连接成之后,客户端与socket服务之间的关系需要保存起来(mysql或redis)。然后新增一个feign的请求拦截器( @Componentpublic class MessageRequestInterceptor implements RequestInterceptor { /** * 存放本次消息投递的目标socket服务id */ static final ThreadLocal<String> SERVER_ID = new ThreadLocal<>(); @Autowired private IRpushServerOnlineService rpushServerOnlineService; @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") @Override public void apply(RequestTemplate requestTemplate) { String url = requestTemplate.url(); String method = requestTemplate.method(); if (!"/push".equals(url) || !"POST".equals(method)) { // 只处理消息投递接口 return; } // 如果是消息推送,需要给接收端连接的服务端投放消息,在服务端集群的情况下,要找到对应的服务端 String body = new String(requestTemplate.body()); JSONObject jsonObject = new JSONObject(body); String sendTo = jsonObject.getStr("sendTo"); // 拿到目标客户端的id String serverId = ""; // 从redis或mysql查到该客户端对应的socket服务id SERVER_ID.set(serverId); // 添加到当前线程里 }} 这个“拦截类”配合上面的”规则类“,就能在路由服务向socket服务传递消息时准确的找到对应的socket服务。 完整的”规则类“: /** * 路由->Socket服务端请求的实例选择 */public class ServerBalancer extends ZoneAvoidanceRule { @Override public Server choose(Object o) { try { // 从拦截类里看有没有指定服务端实例 String serverId = MessageRequestInterceptor.SERVER_ID.get(); if (StringUtils.isEmpty(serverId)) { // 如果没有指定服务端实例,用默认的负载均衡算法 return super.choose(o); } // 如果指定了服务端实例,说明是消息传递,用指定好的实例向socket服务发请求 List<Server> servers = getLoadBalancer().getAllServers(); for (Server server : servers) { if (StringUtils.equals(server.getId(), serverId)) { return server; } } throw new IllegalArgumentException("没有可用的RPUSH_SERVER实例"); } finally { MessageRequestInterceptor.SERVER_ID.remove(); } }} 而且有了这两个类,路由服务向socket服务传递消息的代码也会非常的”干净“: @Componentpublic class RpushMessageHandler extends MessageHandler<RpushMessageDTO> { // ... @Override public void handle(RpushMessageDTO param) { List<String> sendTos = param.getReceiverIds(); for (String sendTo : sendTos) { // ... messagePushService.push(build); // 路由服务直接调用接口请求即可,”规则类“和”拦截类“屏蔽掉了其它逻辑,所以这里不需要关心会不会发给错误socket服务 } }} 3. 其它
用docker-compose快速部署一个Rpush服务version: '2'servi |
请发表评论