热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Golang实现UDPServer并发送消息到ActiveMQ

示例代码packagemainimport("net""os"&am

示例代码

package main

import (
    "net"
    "os"
    "github.com/gpmgo/gopm/modules/goconfig"
    "github.com/go-stomp/stomp"
    "time"
    "strconv"
    "log"
    "strings"
)




// 限制goroutine数量
var limitChan = make(chan bool, 10000) // Todo 从配置文件中读取

// 限制同时处理消息数量
var msgChan = make(chan string, 10000) // Todo 从配置文件中读取
var activeMqLimitedChan = make(chan bool, 100)
var activeMq *stomp.Conn
var activeQueue string
var host string
var port string
var cOnnectTimes= 0
var udpAddress = "0.0.0.0"    // Todo 从配置文件中读取
var udpPort = "514"    // Todo 从配置文件中读取
var logFilePath = "/var/log/syslog_server/"
var cOnfigFilePath= "./config.ini"
// UDP goroutine 实现并发读取UDP数据
func udpProcess(conn *net.UDPConn) {
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("udpProcess error:", e)
        }
        // 释放出一个协程
        <- limitChan
    }()

    // 最大读取数据大小
    data := make([]byte, 1024)
    n, _, err := conn.ReadFromUDP(data)
    if err != nil {
        panic(err)
    }

    // 获取对端的IP地址
    // remoteAddr := conn.RemoteAddr()
    // msgChan <- remoteAddr.String() + " " + string(data[:n])

    msgChan <- string(data[:n])


}


func udpServer(address, port string) {
    // @todo 如何防止udpServer 一直Panic导致无限循环重启
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("udpServer error:", e)

            // udpServer启动失败后,间隔10秒后重试
            time.Sleep(10 * time.Second)
            udpServer(udpAddress, udpPort)
        }
    }()

    udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
    conn, err := net.ListenUDP("udp", udpAddr)
    defer conn.Close()
    if err != nil {
        panic(err)
    }

    for {
        limitChan <- true
        go udpProcess(conn)
    }
}


// 读取ActiveMQ配置信息
func getConfiguration(){
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
                os.Exit(1)
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("Get Configuration error:", e)
        }
    }()

    configFile, err := goconfig.LoadConfigFile(configFilePath)
    if err != nil {
        panic(err)
    }

    host, err = configFile.GetValue("active_mq", "host")
    if err != nil {
        // 如果没有配置主机,则使用本地主机
        host = "127.0.0.1"
    }
    port, err = configFile.GetValue("active_mq", "port")
    if err != nil {
        // 如果没配置端口,则使用默认端口
        port = "61613"
    }

    activeQueue, err = configFile.GetValue("active_mq", "queue")
    if err != nil {
        // 如果没配置端口,则使用默认队列名
        activeQueue = "syslog.queue"
    }
}

// 使用IP和端口连接到ActiveMQ服务器, 返回ActiveMQ连接对象
func connActiveMq(){
    // @todo 如何防止无限循环
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("connActiveMq error:", e)

            // ActiveMQ服务器连接失败后,间隔3秒后重试
            time.Sleep(3 * time.Second)
            activeMq = nil
            connActiveMq()
        }
    }()

    // @todo 实现断开重连
    if activeMq == nil {
        var err error
        activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
        if err != nil {
            connectTimes ++
            if connectTimes >= 100 {
                time.Sleep(60 * time.Second)
            }else if connectTimes >= 10 {
                time.Sleep(10 * time.Second)
            }else {
                time.Sleep(3 * time.Second)
            }
            panic(err.Error() + ", 重新连接ActiveMQ, 已重试次数: " + strconv.Itoa(connectTimes))

        }else {
            connectTimes = 0
        }
    }
}


func activeMqProducer(c chan string){
    // @todo 如何防止activeMqProducer 退出
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("activeMqProducer error:", e)

            // 重试
            go activeMqProducer(msgChan)
        }
    }()
    for{
        activeMqLimitedChan <- true    // 限制开启协程数量
        contentMsg := <-c
        go func() {
            defer func() {
                if e := recover(); e != nil {
                    err := os.MkdirAll(logFilePath, 777)
                    log.Fatalln("create log dirctory error: ", err.Error())
                    // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                    logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                    logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                    defer logFile.Close()
                    if err != nil {
                        log.Fatalln("open log file error: ", err.Error())
                    }
                    logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

                    // 记录错误日志
                    logger.Println("activeMqProducer error:", e)
                }
                // 释放出一个协程
                <- activeMqLimitedChan
            }()

            err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
            if err != nil {
                if err.Error() == "connection already closed"{
                    activeMq = nil
                    connActiveMq()
                    activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
                }
                panic(err)
            }
        }()

    }

}


func init(){
    // 初始化 ActiveMQ 配置
    getConfiguration()

    // 连接到 ActiveMQ 服务器
    connActiveMq()

    // 启动一个协程将Syslog消息放入ActiveMQ队列中
    go activeMqProducer(msgChan)

}

func main() {
    defer activeMq.Disconnect()
    udpServer(udpAddress, udpPort)
}

 


推荐阅读
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • Google Play推出全新的应用内评价API,帮助开发者获取更多优质用户反馈。用户每天在Google Play上发表数百万条评论,这有助于开发者了解用户喜好和改进需求。开发者可以选择在适当的时间请求用户撰写评论,以获得全面而有用的反馈。全新应用内评价功能让用户无需返回应用详情页面即可发表评论,提升用户体验。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 代理模式的详细介绍及应用场景
    代理模式是一种在软件开发中常用的设计模式,通过在客户端和目标对象之间增加一层中间层,让代理对象代替目标对象进行访问,从而简化系统的复杂性。代理模式可以根据不同的使用目的分为远程代理、虚拟代理、Copy-on-Write代理、保护代理、防火墙代理、智能引用代理和Cache代理等几种。本文将详细介绍代理模式的原理和应用场景。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 来吹下汽车
    最近帮同事的一个朋友选车,最后他决定了一汽大众的迈腾,也就是海外版(欧洲为主)的帕萨特B8,国内如果加长过的话,应该叫B8L吧。基于大众最新的通用MQB平台(模块化横置发动机平台) ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • SQL Server 内存中OLTP内部机制概述(一)
    内存中OLTP(项目名为“Hekaton”)是一个新的完全集成到SQLServer中的数据库引擎组件。它专为访问内存常驻数据的OLTP工作负荷而进行优化。内存中OLTP有助于OLT ... [详细]
  • 三、查看Linux版本查看系统版本信息的命令:lsb_release-a[root@localhost~]#lsb_release-aLSBVersion::co ... [详细]
author-avatar
手机用户2602892387
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有