目前网上关于 OpenMessaging 的技术文章还很少,我个人预测它将来会很火。而阿里捐献给 Apache 的 RocketMQ 对 OpenMessaging 提供了部分实现。本文将讲解他们之间的开发实战。
更多精彩内容请看 web 前端中文站
http://www.lisa33xiaoq.net 可按 Ctrl + D 进行收藏
前面我已经介绍过 《OpenMessaging 的架构原理》,没阅读过的可以看看,加深理解。
OpenMessaging 包括建立行业准则和消息传递,流媒体规范为金融,电子商务,物联网和大数据区域提供了一个共同的框架。设计原则是分布式异构环境中面向云,简单,灵活和语言无关。符合这些规范将有可能在所有主要平台和操作系统上开发异构消息应用程序。
以下示例显示如何在同步,异步或单向传输中向 RocketMQ 代理发送消息。ProducerApp 生产者:
package io.openmessaging.samples.producer;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
public class ProducerApp {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace");
// web 前端中文站:www.lisa33xiaoq.net
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.println("MessagingAccessPoint startup OK");
producer.startup();
System.out.println("Producer startup OK");
//Add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();} }));
//Sync www.lisa33xiaoq.net
{
SendResult sendResult = producer.send(producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.println("Send sync message OK, message id is: " +
sendResult.messageId()); }
//Async with Promise {
final Future<SendResult> result = producer.sendAsync(
producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
final SendResult sendResult = result.get(3000L);
System.out.println("Send async message OK, message id is: " +
sendResult.messageId()); }
//Async with FutureListener{
final Future<SendResult> result = producer.sendAsync(
producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() {
@Override
public void operationSucceeded(Future<SendResult> promise) {
System.out.println("Send async message OK, message id is: " +
promise.get().messageId()); }
@Override
public void operationFailed(Future<SendResult> promise) {
System.out.println("Send async message Failed, cause is: " +
promise.getThrowable().getMessage());} }); }
//Oneway
{
producer.sendOneway(producer.createTopicBytesMessage(
"HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.println("Send oneway message OK"); } }
使用 PullConsumerApp 轮询来自指定队列的消息。PullConsumerApp 消费者:
package io.openmessaging.samples.consumer; import io.openmessaging.ResourceManager; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; import io.openmessaging.consumer.PullConsumer; public class PullConsumerApp { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory .getMessagingAccessPoint("openmessaging:rocketmq://localhost:10911/namespace"); messagingAccessPoint.startup();//www.lisa33xiaoq.net System.out.println("MessagingAccessPoint startup OK"); ResourceManager resourceManager = messagingAccessPoint.getResourceManager(); //www.lisa33xiaoq.net resourceManager.createQueue("NS1", "HELLO_QUEUE", OMS.newKeyValue()); //PullConsumer only can pull messages from one queue.final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer("HELLO_QUEUE"); pullConsumer.startup(); //Poll one message from queue.Message message = pullConsumer.poll(); //Acknowledges the consumed message pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.MessageId)); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { pullConsumer.shutdown(); messagingAccessPoint.shutdown();} })); } }
将 PushConsumer 附加到指定的队列,并通过 MessageListener 消费消息。
package io.openmessaging.samples.consumer; import io.openmessaging.ResourceManager; import io.openmessaging.Message; import io.openmessaging.consumer.MessageListener; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; import io.openmessaging.consumer.PushConsumer; import io.openmessaging.exception.OMSResourceNotExistException; public class PushConsumerApp { public static void main(String[] args) throws OMSResourceNotExistException { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory.getMessagingAccessPoint( "openmessaging:rocketmq://localhost:10911/namespace"); messagingAccessPoint.startup(); System.out.println("MessagingAccessPoint startup OK"); ResourceManager resourceManager = messagingAccessPoint.getResourceManager(); final PushConsumer consumer = messagingAccessPoint.createPushConsumer(); // Consume messages from a simple queue. {String simpleQueue = "HELLO_QUEUE"; resourceManager.createQueue("NS1", simpleQueue, OMS.newKeyValue()); //This queue doesn't has a source topic, so only the message delivered to the //queue directly can //be consumed by this consumer..attachQueue(simpleQueue, new MessageListener() {@Override public void onReceived(Message message, Context context) { System.out.println("Received one message: " + message); context.ack();} }); consumer.startup(); System.out.println("Consumer startup OK"); } //Consume messages from a complex queue.final PushConsumer anotherConsumer = messagingAccessPoint.createPushConsumer(); {String complexQueue = "QUEUE_HAS_SOURCE_TOPIC"; String sourceTopic = "SOURCE_TOPIC"; //Create the complex queue.resourceManager.createQueue("NS_01", complexQueue, OMS.newKeyValue());//www.lisa33xiaoq.net //Create the source topic.resourceManager.createTopic("NS_01", sourceTopic, OMS.newKeyValue()); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { consumer.shutdown(); anotherConsumer.shutdown(); messagingAccessPoint.shutdown();}})); } }
相关源代码,请一步阅读:https://github.com/openmessaging/openmessaging-java。
【注:本文源自网络文章资源,由站长整理发布】