热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java消息队列Spring整合ActiveMq我是小强zz

1、概述首先和大家一起回顾一下Java消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:然后在另一篇博客《Java消息队列-ActiveMq实战》中

1、概述


 

  首先和大家一起回顾一下Java 消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:

  1. 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现。
  2. 优势:异步、可靠
  3. 消息模型:点对点,发布/订阅
  4. JMS中的对象

  然后在另一篇博客《Java消息队列-ActiveMq实战》中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解:  

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

  在接下来的这篇博客中,我会和大家一起来整合Spring 和ActiveMq,这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

 

2、目录结构

 


  2.1 项目目录

      IDE选择了IDEA(建议大家使用),为了避免下载jar 的各种麻烦,底层使用maven搭建了一个项目,整合了Spring 和ActiveMq

   

   

    2.2 pom.xml

 View Code

    因为这里pom.xml 文件有点长,就不展开了。

    我们可以看到其实依赖也就几个,1、Spring 核心依赖 2、ActiveMq core和pool(这里如果同学们选择导入jar,可以直接导入我们上一篇博客中说道的那个activemq-all 这个jar包)3、java servlet 相关依赖

    这里面我们选择的ActiveMq pool 的依赖版本会和之后的dtd 有关系,需要版本对应,所以同学们等下配置activemq 文件的时候,需要注意dtd 版本选择

 

    2.3 web.xml

    web.xml 也大同小异,指定Spring 配置文件,springMvc 命名,编码格式




  Archetype Created Web Application

  
  
    contextConfigLocation
    
      classpath:applicationContext*.xml;
    
  

  
    org.springframework.web.context.ContextLoaderListener
  

  
    springMVC
    org.springframework.web.servlet.DispatcherServlet
    
      contextConfigLocation
      classpath:spring-mvc.xml
    
    1
  
  
    springMVC
    /
  

  
  
    characterEncodingFilter
    org.springframework.web.filter.CharacterEncodingFilter
    
      encoding
      UTF-8
    
    
      forceEncoding
      true
    
  
  
    characterEncodingFilter
    /*
  

 

 

    2.4 SpringMvc 和applicationContext.xml

      这里面的SpringMVC没什么特别,有需要的同学可以参考一下:

 View Code

      applicationContext.xml 主要使用来装载Bean,我们项目中并没有什么特别的Java Bean,因此只用来指出包扫描路径:

 View Code

 

   

    2.5 applicationContext-ActiveMQ.xml




    
    

    

    
    
        
        
    

    
    
        
        
            Jaycekon
        
    

    
    
        
        
        
        
        
    


    
    

    
    
        
        
        
    

 

       这里和大家讲解一下这个配置文件,如果大家能够从上述配置文件中看懂,可以跳过。同学们也可以在ActiveMQ官网中的查看。

       1、ActiveMq 中的DTD,我们在声明相关配置之前,我们需要先导入ActiveMq 中的DTD,不然Spring 并不理解我们的标签是什么意思。

         http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd

        我们在pom.xml 文件中有配置了activemq 的版本依赖我们这里的版本,需要和依赖的版本一样,不然是找不到相关的dtd

       2、amq:connectionFactory:很直白的一个配置项,用于配置我们链接工厂的地址和用户名密码,这里需要注意的是选择tcp连接而不是http连接

       3、jmsTemplate:比较重要的一个配置,这里指定了连接工厂,默认消息发送目的地,还有连接时长,发布消息的方式

 

3、项目结构


  3.1 ProducerService

package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ProducerService {

    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination,final String msg){
        System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }

    public void sendMessage(final String msg){
        String destination = jmsTemplate.getDefaultDestinationName();
        System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
}

 

     将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用ProducerService实例中的sendMessage 方法就可以向默认目的发送一个消息。

    这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。

    有兴趣的同学可以和我上一篇文章《ActiveMq实战》中ActiveMq 发送消息的方式对比一下,可以发现一些不同。

 

 

   3.2 ConsumerService

package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ConsumerService {
    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    public TextMessage receive(Destination destination){
        TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
        try{
            System.out.println("从队列" + destination.toString() + "收到了消息:\t"
                    + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return textMessage;
    }
}

 

 

     因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。

     再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。

 

   3.3 MessageController

package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Controller
public class MessageController {
    private Logger logger = LoggerFactory.getLogger(MessageController.class);
    @Resource(name = "demoQueueDestination")
    private Destination destination;

    //队列消息生产者
    @Resource(name = "producerService")
    private ProducerService producer;

    //队列消息消费者
    @Resource(name = "consumerService")
    private ConsumerService consumer;

    @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
    @ResponseBody
    public void send(String msg) {
        logger.info(Thread.currentThread().getName()+"------------send to jms Start");
        producer.sendMessage(msg);
        logger.info(Thread.currentThread().getName()+"------------send to jms End");
    }

    @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
    @ResponseBody
    public Object receive(){
        logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
        TextMessage tm = consumer.receive(destination);
        logger.info(Thread.currentThread().getName()+"------------receive from jms End");
        return tm;
    }

}

    控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。

    现在服务层和控制层都好了,接下来我们就进行一个简单的测试

 

4、项目测试


  4.1 启动ActiveMq

      先确定你的ActiveMQ服务已经开启。

    

 

 

  4.2 启动项目

    项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。


      
        org.apache.tomcat.maven
        tomcat7-maven-plugin
        
          8080
          /
        
      

 

 

 

  4.3 发送消息

  这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!

    

    我们发送了一个post 请求之后,看一下服务器的效果:

    我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:

    我们可以看到,一条消息已经成功发送到了ActiveMq中。

 

 

  4.4 接收消息

    使用get请求访问服务器后台:

  

 

     服务的输出:

 

     ActiveMq服务器状态:

    我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。

  

  4.5 监听器

    在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。

    不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。

   4.5.1 applicationContext-ActiveMQ.xml 配置

      在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。

    
    

    
    
        
        
        
    

 

 

   4.5.2 MessageListener

      我们需要创建一个类实现MessageListener 接口:

package com.Jayce.Filter;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
public class QueueMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("QueueMessageListener监听到了文本消息:\t"
                    + tm.getText());
            //do something ...
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

   实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。 

 

   4.5.3 测试

    和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:

   

    再看看ActiveMQ服务器的状态:

  我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。

  这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。

 

  4.6 压力测试

    这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/1/5.
 */
public class Client {

    @Test
    public void test() {
        HttpClient httpClient = new HttpClient();
        new Thread(new Sender(httpClient)).start();

    }

}

class Sender implements Runnable {
    public static AtomicInteger count = new AtomicInteger(0);
    HttpClient httpClient;

    public Sender(HttpClient client) {
        httpClient = client;
    }

    public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
                PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
                post.addParameter("msg", "Hello world!");
                httpClient.executeMethod(post);
                System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

 

     这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。


推荐阅读
  • XMLhttpREquest_Ajax技术总结之XmlHttpRequest
    Ajax1、 什么是ajax   ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了使用AJAX的POST请求实现数据修改功能的方法。通过ajax-post技术,可以实现在输入某个id后,通过ajax技术调用post.jsp修改具有该id记录的姓名的值。文章还提到了AJAX的概念和作用,以及使用async参数和open()方法的注意事项。同时强调了不推荐使用async=false的情况,并解释了JavaScript等待服务器响应的机制。 ... [详细]
  • 本文介绍了高校天文共享平台的开发过程中的思考和规划。该平台旨在为高校学生提供天象预报、科普知识、观测活动、图片分享等功能。文章分析了项目的技术栈选择、网站前端布局、业务流程、数据库结构等方面,并总结了项目存在的问题,如前后端未分离、代码混乱等。作者表示希望通过记录和规划,能够理清思路,进一步完善该平台。 ... [详细]
  • iOS超签签名服务器搭建及其优劣势
    本文介绍了搭建iOS超签签名服务器的原因和优势,包括不掉签、用户可以直接安装不需要信任、体验好等。同时也提到了超签的劣势,即一个证书只能安装100个,成本较高。文章还详细介绍了超签的实现原理,包括用户请求服务器安装mobileconfig文件、服务器调用苹果接口添加udid等步骤。最后,还提到了生成mobileconfig文件和导出AppleWorldwideDeveloperRelationsCertificationAuthority证书的方法。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • POCOCLibraies属于功能广泛、轻量级别的开源框架库,它拥有媲美Boost库的功能以及较小的体积广泛应用在物联网平台、工业自动化等领域。POCOCLibrai ... [详细]
  • centos6.8 下nginx1.10 安装 ... [详细]
  • 初识java关于JDK、JRE、JVM 了解一下 ... [详细]
author-avatar
打杂大叔_868
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有