热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Golang实现UDPServer并发送消息到ActiveMQ

示例代码packagemainimport("net""os"&am

示例代码

package main

import (
    "net"
    "os"
    "github.com/gpmgo/gopm/modules/goconfig"
    "github.com/go-stomp/stomp"
    "time"
    "strconv"
    "log"
    "strings"
)




// 限制goroutine数量
var limitChan = make(chan bool, 10000) // Todo 从配置文件中读取

// 限制同时处理消息数量
var msgChan = make(chan string, 10000) // Todo 从配置文件中读取
var activeMqLimitedChan = make(chan bool, 100)
var activeMq *stomp.Conn
var activeQueue string
var host string
var port string
var cOnnectTimes= 0
var udpAddress = "0.0.0.0"    // Todo 从配置文件中读取
var udpPort = "514"    // Todo 从配置文件中读取
var logFilePath = "/var/log/syslog_server/"
var cOnfigFilePath= "./config.ini"
// UDP goroutine 实现并发读取UDP数据
func udpProcess(conn *net.UDPConn) {
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("udpProcess error:", e)
        }
        // 释放出一个协程
        <- limitChan
    }()

    // 最大读取数据大小
    data := make([]byte, 1024)
    n, _, err := conn.ReadFromUDP(data)
    if err != nil {
        panic(err)
    }

    // 获取对端的IP地址
    // remoteAddr := conn.RemoteAddr()
    // msgChan <- remoteAddr.String() + " " + string(data[:n])

    msgChan <- string(data[:n])


}


func udpServer(address, port string) {
    // @todo 如何防止udpServer 一直Panic导致无限循环重启
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("udpServer error:", e)

            // udpServer启动失败后,间隔10秒后重试
            time.Sleep(10 * time.Second)
            udpServer(udpAddress, udpPort)
        }
    }()

    udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
    conn, err := net.ListenUDP("udp", udpAddr)
    defer conn.Close()
    if err != nil {
        panic(err)
    }

    for {
        limitChan <- true
        go udpProcess(conn)
    }
}


// 读取ActiveMQ配置信息
func getConfiguration(){
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
                os.Exit(1)
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("Get Configuration error:", e)
        }
    }()

    configFile, err := goconfig.LoadConfigFile(configFilePath)
    if err != nil {
        panic(err)
    }

    host, err = configFile.GetValue("active_mq", "host")
    if err != nil {
        // 如果没有配置主机,则使用本地主机
        host = "127.0.0.1"
    }
    port, err = configFile.GetValue("active_mq", "port")
    if err != nil {
        // 如果没配置端口,则使用默认端口
        port = "61613"
    }

    activeQueue, err = configFile.GetValue("active_mq", "queue")
    if err != nil {
        // 如果没配置端口,则使用默认队列名
        activeQueue = "syslog.queue"
    }
}

// 使用IP和端口连接到ActiveMQ服务器, 返回ActiveMQ连接对象
func connActiveMq(){
    // @todo 如何防止无限循环
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("connActiveMq error:", e)

            // ActiveMQ服务器连接失败后,间隔3秒后重试
            time.Sleep(3 * time.Second)
            activeMq = nil
            connActiveMq()
        }
    }()

    // @todo 实现断开重连
    if activeMq == nil {
        var err error
        activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
        if err != nil {
            connectTimes ++
            if connectTimes >= 100 {
                time.Sleep(60 * time.Second)
            }else if connectTimes >= 10 {
                time.Sleep(10 * time.Second)
            }else {
                time.Sleep(3 * time.Second)
            }
            panic(err.Error() + ", 重新连接ActiveMQ, 已重试次数: " + strconv.Itoa(connectTimes))

        }else {
            connectTimes = 0
        }
    }
}


func activeMqProducer(c chan string){
    // @todo 如何防止activeMqProducer 退出
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("activeMqProducer error:", e)

            // 重试
            go activeMqProducer(msgChan)
        }
    }()
    for{
        activeMqLimitedChan <- true    // 限制开启协程数量
        contentMsg := <-c
        go func() {
            defer func() {
                if e := recover(); e != nil {
                    err := os.MkdirAll(logFilePath, 777)
                    log.Fatalln("create log dirctory error: ", err.Error())
                    // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                    logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                    logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                    defer logFile.Close()
                    if err != nil {
                        log.Fatalln("open log file error: ", err.Error())
                    }
                    logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

                    // 记录错误日志
                    logger.Println("activeMqProducer error:", e)
                }
                // 释放出一个协程
                <- activeMqLimitedChan
            }()

            err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
            if err != nil {
                if err.Error() == "connection already closed"{
                    activeMq = nil
                    connActiveMq()
                    activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
                }
                panic(err)
            }
        }()

    }

}


func init(){
    // 初始化 ActiveMQ 配置
    getConfiguration()

    // 连接到 ActiveMQ 服务器
    connActiveMq()

    // 启动一个协程将Syslog消息放入ActiveMQ队列中
    go activeMqProducer(msgChan)

}

func main() {
    defer activeMq.Disconnect()
    udpServer(udpAddress, udpPort)
}

 


推荐阅读
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • vue使用
    关键词: ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
author-avatar
手机用户2602892387
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有