• 欢迎访问web前端中文站,JavaScript,CSS3,HTML5,web前端demo
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏web前端中文站吧

OpenMessaging访问RocketMQ教程(开发实战)

JAVA web前端中文站 2年前 (2017-10-16) 890次浏览 已收录 0个评论

目前网上关于 OpenMessaging 的技术文章还很少,我个人预测它将来会很火。而阿里捐献给 Apache 的 RocketMQ 对 OpenMessaging 提供了部分实现。本文将讲解他们之间的开发实战。

更多精彩内容请看 web 前端中文站
http://www.lisa33xiaoq.net 可按 Ctrl + D 进行收藏

前面我已经介绍过 《OpenMessaging 的架构原理》,没阅读过的可以看看,加深理解。

OpenMessaging 访问 RocketMQ 教程(开发实战)

OpenMessaging 包括建立行业准则和消息传递,流媒体规范为金融,电子商务,物联网和大数据区域提供了一个共同的框架。设计原则是分布式异构环境中面向云,简单,灵活和语言无关。符合这些规范将有可能在所有主要平台和操作系统上开发异构消息应用程序。

以下示例显示如何在同步,异步或单向传输中向 RocketMQ 代理发送消息。ProducerApp 生产者:

 package io.openmessaging.samples.producer; 
 import io.openmessaging.Future; 
 import io.openmessaging.FutureListener; 
 import io.openmessaging.MessagingAccessPoint; 
 import io.openmessaging.MessagingAccessPointFactory; 
 import io.openmessaging.producer.Producer; 
 import io.openmessaging.producer.SendResult; 
 import java.nio.charset.Charset; 

 public class ProducerApp {     
 public static void main(String[] args) {         
 final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory             
 .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");   
 // web 前端中文站:www.lisa33xiaoq.net        
 final Producer producer = messagingAccessPoint.createProducer();         
 messagingAccessPoint.startup();         
 System.out.println("MessagingAccessPoint startup OK");         
 producer.startup();         
 System.out.println("Producer startup OK");         
 //Add a shutdown hook         
 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {             
 @Override             
 public void run() {                 
 producer.shutdown();                 
 messagingAccessPoint.shutdown();}         }));         
 //Sync www.lisa33xiaoq.net         
 {             
 SendResult sendResult = producer.send(producer.createTopicBytesMessage(                 
 "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));              
 System.out.println("Send sync message OK, message id is: " + 
 sendResult.messageId());         }         
 //Async with Promise         {             
 final Future<SendResult> result = producer.sendAsync(
 producer.createTopicBytesMessage(                 
 "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));              
 final SendResult sendResult = result.get(3000L);             
 System.out.println("Send async message OK, message id is: " + 
 sendResult.messageId());         }         
 //Async with FutureListener{             
 final Future<SendResult> result = producer.sendAsync(
 producer.createTopicBytesMessage(                 
 "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));             
 result.addListener(new FutureListener<SendResult>() {                 
 @Override                 
 public void operationSucceeded(Future<SendResult> promise) {                     
 System.out.println("Send async message OK, message id is: " + 
 promise.get().messageId());                 }                 
 @Override                 
 public void operationFailed(Future<SendResult> promise) {                     
 System.out.println("Send async message Failed, cause is: " + 
 promise.getThrowable().getMessage());}             });         }         
 //Oneway         
 {            
 producer.sendOneway(producer.createTopicBytesMessage(                 
 "HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));             
 System.out.println("Send oneway message OK");     } }

使用 PullConsumerApp 轮询来自指定队列的消息。PullConsumerApp 消费者:

 package io.openmessaging.samples.consumer; 
 import io.openmessaging.ResourceManager; 
 import io.openmessaging.Message; 
 import io.openmessaging.MessagingAccessPoint; 
 import io.openmessaging.MessagingAccessPointFactory; 
 import io.openmessaging.OMS; 
 import io.openmessaging.consumer.PullConsumer; 
 public class PullConsumerApp {     
 public static void main(String[] args) {         
 final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory             
 .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");         
 messagingAccessPoint.startup();//www.lisa33xiaoq.net         
 System.out.println("MessagingAccessPoint startup OK");         
 ResourceManager resourceManager = messagingAccessPoint.getResourceManager();   
 //www.lisa33xiaoq.net        
 resourceManager.createQueue("NS1", "HELLO_QUEUE", OMS.newKeyValue());         
 //PullConsumer only can pull messages from one queue.final 
 PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer("HELLO_QUEUE");         
 pullConsumer.startup();         
 //Poll one message from queue.Message message = pullConsumer.poll();         
 //Acknowledges the consumed message         
 pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.MessageId));         
 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {            
 @Override            
 public void run() {                 
 pullConsumer.shutdown();                 
 messagingAccessPoint.shutdown();}         }));     } }

将 PushConsumer 附加到指定的队列,并通过 MessageListener 消费消息。

 package io.openmessaging.samples.consumer; 
 import io.openmessaging.ResourceManager; 
 import io.openmessaging.Message; 
 import io.openmessaging.consumer.MessageListener; 
 import io.openmessaging.MessagingAccessPoint; 
 import io.openmessaging.MessagingAccessPointFactory; 
 import io.openmessaging.OMS; 
 import io.openmessaging.consumer.PushConsumer; 
 import io.openmessaging.exception.OMSResourceNotExistException; 
 public class PushConsumerApp {     
 public static void main(String[] args) throws OMSResourceNotExistException {         
 final MessagingAccessPoint messagingAccessPoint = 
 MessagingAccessPointFactory.getMessagingAccessPoint(
 "openmessaging:rocketmq://localhost:10911/namespace");         
 messagingAccessPoint.startup();         
 System.out.println("MessagingAccessPoint startup OK");         
 ResourceManager resourceManager = messagingAccessPoint.getResourceManager();         
 final PushConsumer consumer = messagingAccessPoint.createPushConsumer();         
 // Consume messages from a simple queue.         
 {String simpleQueue = "HELLO_QUEUE";             
 resourceManager.createQueue("NS1", simpleQueue, OMS.newKeyValue());             
 //This queue doesn't has a source topic, so only the message delivered to the 
 //queue directly can             
 //be consumed by this consumer..attachQueue(simpleQueue, new MessageListener() 
 {@Override                 
 public void onReceived(Message message, Context context) {                     
 System.out.println("Received one message: " + message);                     
 context.ack();}              });             
 consumer.startup();             
 System.out.println("Consumer startup OK");         }         
 //Consume messages from a complex queue.final PushConsumer anotherConsumer = 
 messagingAccessPoint.createPushConsumer();        
 {String complexQueue = "QUEUE_HAS_SOURCE_TOPIC";             
 String sourceTopic = "SOURCE_TOPIC";              
 //Create the complex queue.resourceManager.createQueue("NS_01", complexQueue, 
 OMS.newKeyValue());//www.lisa33xiaoq.net             
 //Create the source topic.resourceManager.createTopic("NS_01", sourceTopic, 
 OMS.newKeyValue());          }         
 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {             
 @Override             
 public void run() {                 
 consumer.shutdown();                 
 anotherConsumer.shutdown();                 
 messagingAccessPoint.shutdown();}}));     } }

相关源代码,请一步阅读:https://github.com/openmessaging/openmessaging-java。

【注:本文源自网络文章资源,由站长整理发布】


web 前端中文站 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:OpenMessaging 访问 RocketMQ 教程(开发实战)
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址