热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Web服务http日志收集

Web服务http日志收集,Go语言社区,Golang程序员人脉社

目录

需求场景:

架构设计:

端口号:

准备资源:

Linux版本:

准备Java环境

ElasticSearch:

下载安装

配置:

启动:

Zookeeper

下载安装:

配置:

启动:

Kafka

下载安装:

配置:

启动命令:

准备和调试:

Logstash:

下载安装:

配置:

logstash服务配置:

被监听的服务配置:

启动命令:

Kibana:

下载安装:

配置:

启动

操作:

案例:

Web服务代码改造:

集成Kafka:

pom.xml添加依赖:

 添加kafka的配置:

 利用切面捕获入库信息:

需要注意的细节:


需求场景:

目前已经运行了一个JavaWeb应用,需要能自定义收集http的request和response,做到报文可追溯,可统计,方便查询,同时不能对现有web服务的http请求造成影响。

 

架构设计:

1 利用AOP获取controller层的reqeust和response,并根据自定义要求做Json序列化

2 AOP获取的内容发送到Kafka,利用MQ的特性,减少对http请求造成的延时

3 LogStash从Kafka中消费Json信息,对内容清洗后按照规则发送给不同的es索引

4 ElasticSearch存储内容

5 三种方式检索es中的内容:

 第一种:在服务器中搭建Kibana,所有用户利用该Kibana来操作es;(推荐)

 第二种:curl直接操作es

 第三种:客户端本地安装Kibana,利用客户端Kibana来操作es

 

端口号:

9200:ElasticSearch的端口

9092:Kafka的端口

2181:ZooKeeper的端口

5601: Kibana的端口

这些端口号是组建默认的端口,可以根据自己的需要进行配置,如果云平台有安全组限制或者实例中有防火墙的限制,需要打开它们。

准备资源:

几个控件可以分开安装,也可装在一台机器上,建议初期都在一台里,网络传输消耗少。

本篇重在介绍如何把它们串联起来,所有控件先只做单节点,而且这些控件集群配置相对都很简单,在以前的博客中都有介绍过。

最低配置:需要4核8G,或者两台2核4G的也可以。

AWS中最低是C4.xlarge 推荐C4.2xlarge

 

Linux版本:

CentOS-7-x86_64-GenericCloud-1802(小版本号无要求,centos7即可)

Jdk:jdk-8u171-linux-x64.tar.gz(小版本号无要求,jdk8即可)

Centos用户,如果是新机器需要重设下密码:

$sudo passwd centos

 

准备Java环境

Jdk:jdk-8u171-linux-x64.tar.gz
配置java环境
$vi ~/.bashrc
export JAVA_HOME=/opt/jdk/jdk1.8.0_171
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
$source ~/.bashrc

ElasticSearch:

下载安装

$wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.1.zip

$unzip elasticsearch-5.6.1.zip

最好安装一个中文插件(如果是纯英文场景可以跳过此步骤)

wget https://github.com/yejingtao/forblog/raw/master/ik/elasticsearch-analysis-ik-5.6.1.zip

中文插件的原因和使用方式:

https://blog.csdn.net/yejingtao703/article/details/78392902

配置:

涉及到的配置文件都在config下

1、elasticsearch.yml:elastic结点、集群的配置信息;

2、jvm.options:jvm的配置信息,里面找到默认启动内存是2G,最低可以改成512m

3、log4j2.properties:elastic的log的配置文件。

 

需要修改绑定地址,否则es不能被外部访问:

修改config 下的配置文件elasticsearch.yml,将#network.host: 192.168.0.1注释放开,同时修改成你对外的IP

启动:

$cd bin

$./elasticsearch

如果遇到这样的报错:

ERROR: [2] bootstrap checks failed

[1]: max file descriptors [4096] forelasticsearch process is too low, increase to at least [65536]

[2]: max virtual memory areas vm.max_map_count[65530] is too low, increase to at least [262144]

解决问题1:执行下命令ulimit -n 65536

解决问题2:需要修改/etc/sysctl.conf配置文件

echo "vm.max_map_count=262144">>/etc/sysctl.conf

sysctl –p//立刻生效

其中在解决ulimit -n 65536问题是被卡住了,原因是centos用户没有ulimit命令的权限

解决方案:

$sudo vi /etc/security/limits.conf
在配置文件的最后添加6行:
centos soft nproc 16384
centos hard nproc 16384
centos soft nofile 65536
centos hard nofile 65536
centos soft memlock 4000000
centos hard memlock 4000000
保存退出后需要重新切换一次centos用户
$su – centos

在外网的浏览器中确认能访问到elasticSearch(http://ip:9200)就可以确定安装完毕

 

建议做为服务器应用将elasticSearch转为后台运行:

$nohup elasticsearch-5.6.1/bin/elasticsearch &

Zookeeper

Zookeeper是使用Kafka的先决条件

下载安装:

$wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

$gunzip zookeeper-3.4.10.tar.gz

 

配置:

将/conf/下zoo_sample.cfg 重命名为 zoo.cfg

vim zoo.cfg   
dataDir=/tmp/zookeeper  (数据文件)
dataLogDir=/tmp/zookeeperlog (日志文件)

启动:

./bin/zkServer.sh start

2181是zk的默认端口,启动后可以telnet检查下是否启动成功(也可以不检查,因为zk启动失败的话,后面的kafka是不会启动成功的)

 

Kafka

下载安装:

$wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz

$gunzip kafka_2.11-2.1.1.tgz

 

配置:

$vi conf/server.properties

#listeners=PLAINTEXT://:9092改成自己的地址

advertised.listeners=PLAINTEXT也要改成外网地址,深坑,否则外面的生产者解析不到kafka的地址。

 

启动命令:

$bin/kafka-server-start.sh config/server.properties

同理上线使用时后台运行

$nohup bin/kafka-server-start.sh config/server.properties &

 

检查Kafka是否启动成功,调用以下命令查询下kafka的topic

$bin/kafka-topics.sh --list --zookeeper {yourip}:2181

 

准备和调试:

创建好topic,给接入代码使用

$bin/kafka-topics.sh --create --zookeeper {yourip}:2181 --replication-factor 1 --partitions 1 --topic httplog

像http日志收集这种应用场景,java代码只做生产者,可以直接用kafka自带的消费者来进行调试,不需要单独开发消费代码。

kafka消费者调测:

$bin/kafka-console-consumer.sh --bootstrap-server {yourip}:9092 --topic httplog

同理,kafka也自带生产者调测工具,这里不再介绍

 

Logstash:

下载安装:

$wget https://artifacts.elastic.co/downloads/logstash/logstash-5.6.3.zip

$unzip logstash-5.6.3.zip

 

配置:

logstash服务配置:

配置文件在HOME/config下

修改logstash.yml

# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
重新配置http.host

端口:
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700

这里注意下就好,主要根据这个端口范围来关心logstash的死活,这里把logstash架在kafka和elasticsearch之间,我们不会通过端口去访问它。

 

被监听的服务配置:

随便一个位置添加一个启动配置文件logstash.conf,建议也是放在logstash的home目录下,内容如下:

input {
  kafka {
    bootstrap_servers => ["10.100.1.142:9092"]
    client_id => "fwapi"
    group_id => "fwapi"
    auto_offset_reset => "latest"
    consumer_threads => 5
    topics => ["httplog"]
    codec => json {
      charset => "UTF-8"
    }
  }
}

filter {
  json{
    source => "message"
    target => "message"
  }
}

output {
  elasticsearch{
    hosts => ["10.100.1.142:9200"]
    index => "httplog-%{+YYYY.MM.dd}"
    timeout => 300
  }
}

启动命令:

$bin/logstash -f logstash.conf

上生产时需要转到后台

$nohup bin/logstash -f logstash.conf &

 

 

Kibana:

下载安装:

https://www.elastic.co/downloads/kibana

下载时请注意Kibana版本要与elasticsearc版本一致,否则会有下面这种坑

 

所以对应上述es版本的kibana安装包是:

https://artifacts.elastic.co/downloads/kibana/kibana-5.6.1-linux-x86_64.tar.gz

 

配置:

修改config/kibana.yml

#elasticsearch.hosts: ["http://localhost:9200"]
改成自己elasticsearch的地址
#server.host: "localhost"
改成自己的kibana地址

启动

$./kibana

上生产时需要后台运行

$ nohup ./kibana &

 

操作:

地址:http://hostname:5601/

Dev Tools里是curl的查询界面,

案例:

具体某一天中某一个接口的请求:GET httplog-2019.02.28/_search

如果按月查询:GET httplog-2019.02.*/_search

按年查询GET httplog-2019.*/_search

全量查询GET /_search

压力监控(QPS):

可以聚合和报表的类型:

 

成品案例:

 

Web服务代码改造:

集成Kafka:

pom.xml添加依赖:


	org.springframework.kafka
	spring-kafka

 添加kafka的配置:

spring.kafka.producer.bootstrap-servers=10.100.129.142:9092
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=4096
spring.kafka.producer.buffer-memory=40960
package com.fw.tester.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String servers;
    
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    
    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;
    
    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;


    public Map producerConfigs() {
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}

 利用切面捕获入库信息:

package com.fw.tester.aspect;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

 

/**
 * Aspect for http log
 */

@Aspect
@Order(5)
@Component
@Slf4j
public class WebLogJsonAspect {
	
	 @Autowired
	 private KafkaTemplate kafkaTemplate;
	 
	 @Value("${spring.kafka.enable}")
	 private boolean kafkaEnable;

    ThreadLocal startTime = new ThreadLocal<>();
    
    @Pointcut("execution(public * com.fw.tester.controller..*.*(..))")
    public void webLog(){}

    @Before("webLog()")
    public void doBefore(JoinPoint joinPoint) throws Throwable {
        startTime.set(System.currentTimeMillis());
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        LogRequest logRequest = new LogRequest(System.currentTimeMillis(), request.getRequestURL().toString(), request.getMethod(),
        		joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName(), JSON.toJSONString(joinPoint.getArgs()));
        log.info(JSON.toJSONString(logRequest));
        
        if(kafkaEnable) {
        	try {
            	kafkaTemplate.send("httplog",  JSON.toJSONString(logRequest));
                log.info("Send message to kafka successfully");
            } catch (Exception e) {
            	log.error("Send message to kafka unsuccessfully", e);
            	e.printStackTrace();
            }
        }
    }

 

    @AfterReturning(returning = "ret", pointcut = "webLog()")
    public void doAfterReturning(Object ret) throws Throwable {
    	HttpServletResponse respOnse= ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
    	LogResponse logRespOnse= new LogResponse(startTime.get(), response.getStatus(), ret, System.currentTimeMillis() - startTime.get());
    	log.info(JSON.toJSONString(logResponse));
    	if(kafkaEnable) {
         	try {
             	kafkaTemplate.send("httplog",  JSON.toJSONString(logResponse));
                 log.info("Send message to kafka successfully");
             } catch (Exception e) {
             	log.error("Send message to kafka unsuccessfully", e);
             	e.printStackTrace();
             }
         }
    }
    
    @AfterThrowing(throwing="ex", pointcut = "webLog()")
    public void doThrowing(Throwable ex){
    	LogResponse logRespOnse= new LogResponse(startTime.get(), HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), System.currentTimeMillis() - startTime.get());
    	log.info(JSON.toJSONString(logResponse));
    	if(kafkaEnable) {
         	try {
             	kafkaTemplate.send("httplog",  JSON.toJSONString(logResponse));
                 log.info("Send message to kafka successfully");
             } catch (Exception e) {
             	log.error("Send message to kafka unsuccessfully", e);
             	e.printStackTrace();
             }
         }
    }
    
    
    @Data
    @AllArgsConstructor
    class LogRequest {
    	private long traceId;
    	private String url;
    	private String httpMethod;
    	private String classMethod;
    	private String args;
    }
    
    @Data
    @AllArgsConstructor
    class LogResponse {
    	private long traceId;
    	private int status;
    	private Object response;
    	private long spendTime;
    }
    
}

 

 完整的示例代码请见:https://github.com/yejingtao/fw-tester

需要注意的细节:

在elasticsearch中尽量用“小索引”,利用Kibana的“大索引”做上层的封装,这样一旦出现问题方便索引的重建和恢复。

举例:

http日志每天都用新的索引来保存记录httplog-2019.03.02,在kibana中大索引配置成httplog-2019*,如果某一天因为新属性导致插入失败,只需要重建当天索引就好。


 


推荐阅读
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 当前,众多初创企业对全栈工程师的需求日益增长,但市场中却存在大量所谓的“伪全栈工程师”,尤其是那些仅掌握了Node.js技能的前端开发人员。本文旨在深入探讨全栈工程师在现代技术生态中的真实角色与价值,澄清对这一角色的误解,并强调真正的全栈工程师应具备全面的技术栈和综合解决问题的能力。 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 如何构建基于Spring MVC框架的Java Web应用项目
    在构建基于Spring MVC框架的Java Web应用项目时,首先应创建一个新的动态Web项目。接着,需将必要的JAR包导入至WebContent/WEB-INF/lib目录下,确保包括Spring核心库及相关依赖。如遇缺失的JAR包,可向社区求助或通过Maven等工具自动下载。正确配置后,即可开始搭建应用结构与功能模块。 ... [详细]
  • 本文将介绍一种扩展的ASP.NET MVC三层架构框架,并通过使用StructureMap实现依赖注入,以降低代码间的耦合度。该方法不仅能够提高代码的可维护性和可测试性,还能增强系统的灵活性和扩展性。通过具体实践案例,详细阐述了如何在实际开发中有效应用这一技术。 ... [详细]
  • 深入解析 Spring MVC 的核心原理与应用实践
    本文将详细探讨Spring MVC的核心原理及其实际应用,首先从配置web.xml文件入手,解析其在初始化过程中的关键作用,接着深入分析请求处理流程,包括控制器、视图解析器等组件的工作机制,并结合具体案例,展示如何高效利用Spring MVC进行开发,为读者提供全面的技术指导。 ... [详细]
  • SpringBoot启动脚本详解:BAT文件应用与基础入门指南(SpringBoot系列第1篇)
    如果你还在为SSM框架的复杂搭建过程和繁琐的配置文件而烦恼,那么SpringBoot将是你的一大福音。作为SpringBoot系列的第一篇文章,本文详细介绍了如何使用BAT文件来启动SpringBoot应用,并提供了基础入门指南,帮助开发者快速上手,简化开发流程。 ... [详细]
  • 开发笔记:校园商铺系统中店铺注册功能模块的Controller层优化与重构
    开发笔记:校园商铺系统中店铺注册功能模块的Controller层优化与重构 ... [详细]
  • 前期Linux环境准备1.修改Linux主机名2.修改IP3.修改主机名和IP的映射关系4.关闭防火墙5.ssh免登陆6.安装JDK,配置环境变量等集群规划主机 IP安装软件运行进 ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
  • 字节Java高级岗:java开发cpu吃多线程吗
    前言抱着侥幸心理投了字节跳动后台JAVA开发岗,居然收到通知去面试,一面下整个人来都是懵逼的,不知道我对着面试官都说了些啥(捂脸~~)。侥幸一面居然过了,三天后接到二面通知,结果这 ... [详细]
  • 「驭龙」开源主机入侵检测系统了解一下
    「驭龙」开源主机入侵检测系统了解一下 ... [详细]
  • FileBeat + Flume + Kafka + HDFS + Neo4j + SparkStreaming + MySQL:【案例】三度关系推荐V1.0版本11:每周一计算最近一月主播视频评级
    一、数据计算步骤汇总下面我们通过文字梳理一下具体的数据计算步骤。第一步:历史粉丝关注数据初始化第二步:实时维护粉丝关注数据第三步:每天定 ... [详细]
  • 【原创】七、Hadoop 2.5.2+zookeeper高可用部署
    一、原理(四大要点)(1)保证元数据一致(edits)namenode(fsimage edits)a、NFSb、journalnodec、zk(2)只有一台namenode对外提 ... [详细]
author-avatar
互粉-结局_596
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有