docker搭建ELK
首先我们搭建ELK环境, 然后再引入缓存Kafka, 数据流为:
1 log-->logstash-agent-->kafka-->logstash-->es-->kibana
首先全局了解下文件结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 . ├── docker-compose.yml ├── kafka │ ├── data │ │ └── kafka-logs-e07760a64ec8 │ │ ├── cleaner-offset-checkpoint │ │ ├── log-start-offset-checkpoint │ │ ├── meta.properties │ │ ├── recovery-point-offset-checkpoint │ │ ├── replication-offset-checkpoint │ │ └── test-0 │ │ ├── 00000000000000000000.index │ │ ├── 00000000000000000000.log │ │ ├── 00000000000000000000.timeindex │ │ └── leader-epoch-checkpoint │ └── docker.sock ├── kibana │ └── kibana.yml ├── logstash │ └── logstash.conf └── zookeeper ├── data └── datalog
docker-compose-elk.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 version: "3" services: elasticsearch: image: elasticsearch:7.7.0 container_name: elasticsearch hostname: elasticsearch environment: - "discovery.type=single-node" ports: - 9200 :9200 - 9300 :9300 networks: - elknetwork kibana: image: kibana:7.7.0 container_name: kibana hostname: kibana ports: - 5601 :5601 volumes: - ./kibana:/usr/share/kibana/config/ links: - elasticsearch:elasticsearch depends_on: - elasticsearch networks: - elknetwork logstash: image: logstash:7.7.0 container_name: logstash hostname: logstash ports: - 9600 :9600 - 8089 :8089 volumes: - ./logstash:/usr/share/logstash/pipeline/ links: - elasticsearch:elasticsearch depends_on: - elasticsearch networks: - elknetwork zookeeper: image: wurstmeister/zookeeper container_name: zookeeper restart: unless-stopped ports: - "2181:2181" volumes: - ./zookeeper/data:/data - ./zookeeper/datalog:/datalog networks: - elknetwork kafka: image: wurstmeister/kafka container_name: kafka restart: unless-stopped ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168 .124 .5 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9092 volumes: - "./kafka/docker.sock:/var/run/docker.sock" - "./kafka/data/:/kafka" depends_on: - zookeeper networks: - elknetwork networks: elknetwork: driver: bridge
注意: logstash没数据源时会主动停止进程, 勿慌
kibana.yml
1 2 3 4 5 server.name: kibana server.host: "0" elasticsearch.hosts: [ "http://elasticsearch:9200" ]monitoring.ui.container.elasticsearch.enabled: true i18n.locale: "zh-CN"
logstash.conf
1 2 3 4 5 6 7 8 9 input { tcp { port => 8089 } } output { elasticsearch { hosts => ["elasticsearch:9200"] } }
docker中logstash默认配置(/usr/share/logstash/config/pipelines.yml)
1 2 - pipeline.id: main path.config: "/usr/share/logstash/pipeline"
docker中logstash默认配置(/usr/share/logstash/config/logstash.yml)
1 2 http.host: "0.0.0.0" xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
熟悉docker中默认配置
1 2 3 4 5 6 docker exec -it logstash /bin/bash ls more config/logstash.yaml
logstash中 /usr/share/logstash/config/pipelines.yml
将配置文件指向 /usr/share/logstash/pipeline
目录下, 该目录下是我们真实的配置文件目录
kibana中 /usr/share/kibana/config/kibana.yml
可以修改为中文
log4j2通过tcp将日志输出到logstash
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > <exclusions > <exclusion > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-logging</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-log4j2</artifactId > </dependency >
log4j2-spring.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <?xml version="1.0" encoding="UTF-8"?> <Configuration status ="OFF" monitorInterval ="60" > <Appenders > <Console name ="Console" target ="SYSTEM_OUT" > <ThresholdFilter level ="info" onMatch ="ACCEPT" onMismatch ="DENY" /> <PatternLayout pattern ="%highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}" /> </Console > <Socket name ="Socket" host ="127.0.0.1" port ="8089" protocol ="TCP" > <JsonLayout properties ="true" compact ="true" eventEol ="true" /> <PatternLayout pattern ="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n" /> </Socket > </Appenders > <Loggers > <Root level ="INFO" > <property name ="hostName" > cuishiying</property > <property name ="applicationName" > elk-demo</property > <appender-ref ref ="Console" /> <appender-ref ref ="Socket" /> </Root > </Loggers > </Configuration >
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @RestController @RequestMapping("/test") public class LogController { private Logger logger = LogManager.getLogger(LogController.class); @RequestMapping(value = "/log4j2", method = RequestMethod.GET) public String testLog () { try { MDC.put("traceid" , String.valueOf(System.currentTimeMillis())); logger.info("Hello 这是 info message. 信息" ); logger.error("Hello 这是 error message. 报警" ); logger.warn("Hello 这是 warn message. 警告" ); logger.debug("Hello 这是 debug message. 调试" ); List<String> list = new ArrayList<>(); System.out.println(list.get(2 )); } catch (Exception e) { logger.error("testLog" , e); } finally { MDC.clear(); } return "" ; } }
项目启动后,点击management可以看到:
logstash会按默认规则自动在ES中创建索引。我们需要手动创建kibana索引,
然后在discover中即可看到日志
用socket方式将日志传输给logstash,如果把logstash停掉然后再启动,日志就无法继续传输了,也就是说socket无法自动重连,这在生产环境中,当然是个隐患。所以生产环境一般会用Logstash-gelf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 <?xml version="1.0" encoding="UTF-8"?> <Configuration > <Properties > <Property name ="LOG_PATTERN" > {"logger": "%logger", "level": "%level", "msg": "%message"}%n</Property > </Properties > <Appenders > <Console name ="Console" target ="SYSTEM_OUT" follow ="true" > <PatternLayout pattern ="${LOG_PATTERN}" /> </Console > <Gelf name ="logstash-gelf" host ="udp:127.0.0.1" port ="4567" version ="1.1" ignoreExceptions ="true" > <Field name ="timestamp" pattern ="%d{yyyy-MM-dd HH:mm:ss.SSS}" /> <Field name ="logger" pattern ="%logger" /> <Field name ="level" pattern ="%level" /> <Field name ="simpleClassName" pattern ="%C{1}" /> <Field name ="className" pattern ="%C" /> <Field name ="server" pattern ="%host" /> </Gelf > </Appenders > <Loggers > <Root level ="INFO" > <AppenderRef ref ="Console" /> <AppenderRef ref ="logstash-gelf" /> </Root > </Loggers > </Configuration >
log4j2将日志输出到kafka
log4j2-spring.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <?xml version="1.0" encoding="UTF-8"?> <Configuration status ="OFF" monitorInterval ="60" > <Properties > <property name ="log_pattern_console" > %highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}</property > <property name ="log_pattern" > %d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %l - %m%n</property > </Properties > <Appenders > <Console name ="Console" target ="SYSTEM_OUT" > <ThresholdFilter level ="info" onMatch ="ACCEPT" onMismatch ="DENY" /> <PatternLayout pattern ="${log_pattern_console}" /> </Console > <Kafka name ="Kafka" topic ="test" > <PatternLayout pattern ="${log_pattern}" /> <Property name ="bootstrap.servers" > localhost:9092</Property > </Kafka > </Appenders > <Loggers > <Root level ="INFO" > <property name ="hostName" > cuishiying</property > <property name ="app_name" > elk-demo</property > <appender-ref ref ="Console" /> <AppenderRef ref ="Kafka" /> </Root > </Loggers > </Configuration >
此时,日志可以直接输出到kafka中
log4j2将日志通过kafka输出到elk中
修改logstash.conf, 将输入源设置为kafka, 输出设置为es, 以kafka的topic和日期创建es索引
logstash.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 input { kafka{ bootstrap_servers => ["kafka:9092"] topics_pattern => "test*" group_id => "logstash-test-group" consumer_threads => 5 codec => "json" # logstash-kafka 插件输入和输出默认codec为json格式 decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中 auto_offset_reset => "latest" type => "test" #所有插件通用属性,尤其在input里面配置多个数据源时很有用 } kafka{ bootstrap_servers => ["kafka:9092"] topics_pattern => "ocpc*" group_id => "logstash-ocpc-group" consumer_threads => 5 codec => "json" decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中 auto_offset_reset => "latest" type => "ocpc" #所有插件通用属性,尤其在input里面配置多个数据源时很有用 } } filter { json { source => "message" } mutate{ remove_field => "@version" } } output { if[type] == "test" { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][kafka][topic]}-%{app_name}-%{+YYYYMMdd}" manage_template => false # 取消logstash自定义模板功能,进而强制使用es的内置模板 } stdout { codec => rubydebug {metadata => true} #logstash控制台输出日志和@metadata信息 } } if[type] == "ocpc" { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][kafka][topic]}-%{+YYYYMMdd}" manage_template => false # 取消logstash自定义模板功能,进而强制使用es的内置模板 } stdout { codec => rubydebug {metadata => true} #logstash控制台输出日志和@metadata信息 } } }
重启docker
1 docker-compose restart logstash
访问 http://localhost:5601 , 可以看到日志已正常输出到ELK中, 包括异常栈。
日志链路追踪
上边的 MDC
放入traceid的操作可以通过过滤器统一放入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 package com.easyliao.auth.common.filter; import com.easyliao.auth.common.utils.IdUtils;import com.easyliao.auth.common.utils.IpUtil;import com.easyliao.auth.common.utils.RequestUtils;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import org.slf4j.MDC;import org.springframework.core.Ordered;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;import org.springframework.web.filter.OncePerRequestFilter; import javax.servlet.FilterChain;import javax.servlet.ServletException;import javax.servlet.annotation.WebFilter;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.io.IOException; @Slf4j @WebFilter(urlPatterns = "/*", filterName = "logFilter") @Order(value = Ordered.HIGHEST_PRECEDENCE) @Component public class LogFilter extends OncePerRequestFilter { private final String TRACE_ID = "traceid" ; private final String IP = "ip" ; private final String DEFAULT_TRACE_ID = "0" ; @Override protected void doFilterInternal (HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, FilterChain filterChain) throws ServletException, IOException { try { initTraceId(httpServletRequest); initIp(httpServletRequest); log(httpServletRequest, httpServletResponse); filterChain.doFilter(httpServletRequest,httpServletResponse); } finally { afterLog(httpServletRequest, httpServletResponse); } } private void log (HttpServletRequest request, HttpServletResponse response) { if (!log.isInfoEnabled()) { return ; } log.info("\n请求地址: [{}] \n请求参数: [{}]" , request.getRequestURL().toString(), RequestUtils.getFormParams(request) ); } private void afterLog (HttpServletRequest req, HttpServletResponse response) { MDC.remove(TRACE_ID); MDC.remove(IP); } private void initIp (HttpServletRequest servletRequest) { MDC.put(IP, IpUtil.getIpAddr(servletRequest)); } private void initTraceId (HttpServletRequest request) { String traceId = request.getParameter(TRACE_ID); if (StringUtils.isBlank(traceId) || this .defaultTraceId(traceId)){ traceId = this .genTraceId(); } this .setTraceId(traceId); } public Boolean defaultTraceId (String traceId) { return DEFAULT_TRACE_ID.equals(traceId); } public String genTraceId () { return IdUtils.uuid(); } public void setTraceId (String traceId) { traceId = StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId; MDC.put(TRACE_ID, traceId); } public String getTraceId () { String traceId = MDC.get(TRACE_ID); return StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId; } }
logback+kafka+elk
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.3.4.RELEASE</version > <relativePath /> </parent > <groupId > com.example</groupId > <artifactId > elk-demo</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > elk-demo</name > <description > Demo project for Spring Boot</description > <properties > <java.version > 1.8</java.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-devtools</artifactId > <scope > runtime</scope > <optional > true</optional > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > <exclusions > <exclusion > <groupId > org.junit.vintage</groupId > <artifactId > junit-vintage-engine</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > com.github.danielwegener</groupId > <artifactId > logback-kafka-appender</artifactId > <version > 0.2.0-RC2</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.1</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
logback-spring.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 <?xml version="1.0" encoding="UTF-8"?> <configuration > <property name ="log_pattern" value ="%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %L - %m%n" /> <appender name ="STDOUT" class ="ch.qos.logback.core.ConsoleAppender" > <encoder > <pattern > ${log_pattern}</pattern > </encoder > </appender > <appender name ="kafkaAppender" class ="com.github.danielwegener.logback.kafka.KafkaAppender" > <encoder > <pattern > ${log_pattern}</pattern > </encoder > <topic > test</topic > <keyingStrategy class ="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <deliveryStrategy class ="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <producerConfig > bootstrap.servers=localhost:9092</producerConfig > <appender-ref ref ="STDOUT" /> </appender > <root level ="info" > <appender-ref ref ="STDOUT" /> <appender-ref ref ="kafkaAppender" /> </root > </configuration >
application.properties
1 logging.config =classpath:logback-spring.xml
最后
本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。
参考