消息中间件之ActiveMQ的基本使用

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 消息中间件之ActiveMQ的基本使用,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

ActiveMQ实现步骤

建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址

通过ConnectionFactory对象创建一个Connection连接

通过Connection对象创建Session会话

通过Session对象创建Destination对象。在点对点模式中,Destination被称作队列(Queue),在Pub/Sub模式中,Destination被称作主题(Topic)

通过Session对象创建消息的发送和接收对象

发送消息

关闭资源

点对点模式

点对点模式的概述

点对点模式涉及:消息队列(Queue) 发送者(Sender)  接收者(Receiver)
	
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

比如生产方发了 10条消息到 activeMQ 服务器, 而此时有多个 消费方,那么这些消费方就会瓜分这些10条消息,一条消息只会被一个消费方得到。

点对点的特点

1.	每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)

2.	发送者和接收者之间在时间上没有依赖性,也就是当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

3.	接收者在成功接收消息之后需向队列应答成功

引入依赖

<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>

生产者

public class Producer {
    private static String BROKERURL = "tcp://127.0.0.1:61616";
    private static String QUEUE = "myQueue";

 public static void main(String[] args) throws JMSException {
        //创建ActiveMQConnectionFactory会话工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
        //创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动连接
        connection.start();
        //Session:一个发送或接收消息的线程
        //false:不开启消息事物,true: 表示以事务提交 设置消息可靠性
        //Session.AUTO_ACKNOWLEDGE :表示消息自动签收
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //创建一个队列
        Queue queue = session.createQueue(QUEUE);
        //MessageProducer:消息生产者
        MessageProducer producer = session.createProducer(queue);
        //设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //发送消息
        send(producer, session);
        System.out.println("发送成功!");
        session.close();
        connection.close();
    }

    static public void send(MessageProducer producer, Session session) throws JMSException {
        for (int i = 1; i <= 5; i++) {
            System.out.println("我是消息" + i);
            //创建文本消息
            TextMessage textMessage = session.createTextMessage("我是消息" + i);
            //通过消息生产者发出消息
            producer.send(textMessage);
        }
    }
}

消费者


public class receiver {
    private static String BROKERURL = "tcp://127.0.0.1:61616";
    private static String QUEUE = "myQueue";

    public static void main(String[] args) throws JMSException {
        //创建ActiveMQConnectionFactory会话工厂 创建连接
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动连接
        connection.start();
        //不开消息启事物,自动签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建队列
        Queue queue = session.createQueue(QUEUE);
        //MessageConsumer:消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true) {
            TextMessage textMessage = (TextMessage) consumer.receive();
            if (textMessage != null) {
                System.out.println("接受到消息:" + textMessage.getText());
            } else {
                break;
            }
        }
        session.close();
        connection.close();
    }
}

发布订阅模式

发布订阅模式概述

发布订阅模式涉及:主题(Topic) 发布者(Publisher) 订阅者(Subscriber) 


发布者将消息发送到Topic,系统将这些消息传递给多个订阅者

特点

1.每个消息可以有多个消费者

2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

3.为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

4.如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

引入依赖

<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>

生产者

public class Producer {
    private static String BROKERURL = "tcp://127.0.0.1:61616";
    private static String TOPIC = "myTopic";

    public static void main(String[] args) throws JMSException {
        //创建ActiveMQConnectionFactory会话工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
        //创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动连接
        connection.start();
        //Session:一个发送或接收消息的线程
        //false:不开启消息事物,true: 表示以事务提交 设置消息可靠性
        //Session.AUTO_ACKNOWLEDGE :表示消息自动签收
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //创建一个队列
        //Queue queue = session.createQueue(QUEUE);
        Topic topic = session.createTopic(TOPIC);
        //MessageProducer:消息生产者
        MessageProducer producer = session.createProducer(topic);
        //设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //发送消息
        send(producer, session);
        System.out.println("发送成功!");
        session.close();
        connection.close();
    }

    static public void send(MessageProducer producer, Session session) throws JMSException {
        for (int i = 1; i <= 5; i++) {
            System.out.println("我是消息" + i);
            //创建文本消息
            TextMessage textMessage = session.createTextMessage("我是消息" + i);
            //通过消息生产者发出消息
            producer.send(textMessage);
        }
    }
}

消费者

public class receiver {
    private static String BROKERURL = "tcp://127.0.0.1:61616";
	private static String TOPIC = "myTopic";

    public static void main(String[] args) throws JMSException {
        //创建ActiveMQConnectionFactory会话工厂 创建连接
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        //启动连接
        connection.start();
        //不开消息启事物,自动签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //创建队列
        //Queue queue = session.createQueue(QUEUE);
		Topic topic = session.createTopic(TOPIC);
        //MessageConsumer:消息消费者
        MessageConsumer consumer = session.createConsumer(topic);
        while (true) {
            TextMessage textMessage = (TextMessage) consumer.receive();
            if (textMessage != null) {
                System.out.println("接受到消息:" + textMessage.getText());
                //手动应答
               textMessage.acknowledge();
            } else {
                break;
            }
        }
        session.close();
        connection.close();
    }
}

消息可靠机制

ActiveMQ消息签收机制:客戶端成功接收一条消息的标志是一条消息被签收,成功应答。

消息的签收情形分两种:带事务的session与不带事务的session

带事务的session

如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。

生成者

 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

 //创建文本消息
 TextMessage textMessage = session.createTextMessage("我是消息" + i);
 //通过消息生产者发出消息
producer.send(textMessage);
//提交消息
session.commit();

消费者

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

 TextMessage textMessage = (TextMessage) consumer.receive();
            if (textMessage != null) {
                System.out.println("接受到消息:" + textMessage.getText());
                session.commit();
            } 

不带事务的session

不带事务的session的签收方式,取决于session的配置。

1.Session.AUTO_ACKNOWLEDGE

Session.AUTO_ACKNOWLEDGE  消息自动签收

2.Session.CLIENT_ACKNOWLEDGE

Session.CLIENT_ACKNOWLEDGE  客戶端调用acknowledge()方法手动签收

生成者

 Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

  //创建文本消息
 TextMessage textMessage = session.createTextMessage("我是消息" + i);
 //通过消息生产者发出消息
 producer.send(textMessage);

消费者

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

  if (textMessage != null) {
                System.out.println("接受到消息:" + textMessage.getText());
                //手动应答
               textMessage.acknowledge();
            } 

3.Session.DUPS_OK_ACKNOWLEDGE

Session.DUPS_OK_ACKNOWLEDGE 不是必须签收,消息可能会重复发送。

在第二次重新传送消息的时候,消息只有在被确认之后,才认为已经被成功地消费了。

ActiveMQ与Spring集成

引入依赖

<dependencies>
          <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
         </dependency>
          <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>    
            <version>5.1.8.RELEASE</version>
        </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context</artifactId>
          <version>5.1.8.RELEASE</version>
      </dependency>
          <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
              <version>5.1.8.RELEASE</version>
          </dependency>
      <!--activemq-client中引入的jms是1.x,5.xSpring需要2.x版本的jms-->
      <dependency>
          <groupId>javax.jms</groupId>
          <artifactId>javax.jms-api</artifactId>
          <version>2.0.1</version>
      </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
        </dependency>
  </dependencies>

xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

   <context:component-scan base-package="cn.ybzy"></context:component-scan>

    <!-- 产生ConnectionConnectionFactory,由对应的JMS服务厂商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>  
    </bean>          
    
    <!-- Spring用于管理ConnectionFactoryConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    
    <!-- 目标ConnectionFactory对应真实的可以产生JMS ConnectionConnectionFactory -->  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>         
    
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
    </bean>      
    
    <!--点对点模式 -->
    <bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg value="queue"/>
    </bean>

    <!--发布、订阅模式-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
       
    <!-- 监听类 -->
    <bean id="myMessageListener" class="cn.ybzy.MyMessageListener"></bean>
    
    <!-- 消息监听容器,会伴随spring的启动 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="textDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>           

</beans>

生产者

@Component
public class Producer {
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Autowired
    private Destination textDestination;
    
    public void sendTextMessage(final String text){
        jmsTemplate.send(textDestination, new MessageCreator() {
            
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(text);
            }
        });
    }
}

消费者监听

public class MyMessageListener implements MessageListener {

    public void onMessage(Message message) {
        TextMessage textMessage=(TextMessage)message;        
        try {
            System.out.println("消费者接收到消息:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring_jms.xml")
public class TestProducer {
    @Autowired
    private Producer producer;

    @Test
    public void testSend(){
    	for (int i = 0; i < 100; i++) {
            producer.sendTextMessage("消息 " + i);
		}
    }
}

ActiveMQ与SpringBoot集成

引入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.8.RELEASE</version>
    </parent>
   
        <!-- springboot整合activemq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!-- 集成发送邮件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

生产者

##activemq连接信息
spring:
  activemq:
    broker-url: tcp://localhost:61616
    in-memory: true
    pool:
      enabled: false

##队列      
messages:
   queue: mail_queue

发送消息

@Service("registerMailboxProducer")
public class RegisterMailboxProducer {
	@Autowired
    private JmsMessagingTemplate jmsMessagingTemplate ; 
	
	public void send(Destination destination,String json){
		jmsMessagingTemplate.convertAndSend(destination, json);
	}
}

发送消息内容封装

@Value("${messages.queue}")
    private String MESSAGES_QUEUE;

    @Override
    public void regist(User user) {
        //TODO 注册逻辑,注册成功后发送邮件

        // 指定队列
        Destination activeMQQueue = new ActiveMQQueue(MESSAGES_QUEUE);
        // 消息内容
        String mailMessage = mailMessage(user.getEmail(), user.getUserName());
        log.info("###regist() 注册发送邮件内容:{}", mailMessage);
        // 发送消息
        registerMailboxProducer.send(activeMQQueue, mailMessage);
    }

    private String mailMessage(String email, String userName) {
        JSONObject root = new JSONObject();
        JSONObject header = new JSONObject();
        header.put("interfaceType", "sms_mail");
        JSONObject content = new JSONObject();
        content.put("mail", email);
        content.put("userName", userName);
        root.put("header", header);
        root.put("content", content);
        return root.toJSONString();
    }

消费者

spring:
  application:
    name: message
  activemq:
    broker-url: tcp://localhost:61616
    in-memory: true
    pool:
      enabled: false
  mail:
    host: smtp.163.com
    username: XXX@163.com
    password: 授权码

消息分发接口

public interface MessageAdapter {
    /**
     * 消息分发
     * @param jsonObject
     */
    public void distribute(JSONObject jsonObject);
}

消息监听,进行分发消息

@Slf4j
@Component
public class ConsumerDistribute {
	@Autowired
	private SMSMailboxService smsMailboxService;

	@JmsListener(destination = "mail_queue")
	public void distribute(String json) {
		log.info("###收到消息,消息内容 json:{}", json);
		if (StringUtils.isEmpty(json)) {
			return;
		}
		JSONObject jsonObject = JSON.parseObject(json);
		JSONObject header = jsonObject.getJSONObject("header");
		String interfaceType = header.getString("interfaceType");
		MessageAdapter messageAdapter = null;
		switch (interfaceType) {
		// 发送邮件
		case "sms_mail":
			messageAdapter=smsMailboxService;
			break;
		default:
			break;
		}
		JSONObject content=jsonObject.getJSONObject("content");
		messageAdapter.distribute(content);
	}
}

发送邮件实现

@Slf4j
@Service
public class SMSMailboxService  implements MessageAdapter {
	@Autowired
	private JavaMailSender mailSender;

	@Value("${spring.mail.username}")
	private String EMAIL_NAME;

	@Override
	public void distribute(JSONObject jsonObject) {
		String mail=jsonObject.getString("mail");
		String userName=jsonObject.getString("userName");
		log.info("###消费者收到消息... mail:{},userName:{}",mail,userName);
		// 发送邮件
		SimpleMailMessage message = new SimpleMailMessage();
		message.setFrom(EMAIL_NAME);
		message.setTo(mail);
		message.setSubject("恭喜您注册成功。");
		message.setText("恭喜您"+userName+",成为XXX的新用户!");
		log.info("###发送短信邮箱 mail:{}", mail);
		mailSender.send(message);
	}
}

测试

http://127.0.0.1:8762/regist

{
     "userName":"小白",
     "email":"55005500@qq.com"
}

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/136937.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!