docker实战之ELK

docker搭建ELK

首先我们搭建ELK环境, 然后再引入缓存Kafka, 数据流为:

1
log-->logstash-agent-->kafka-->logstash-->es-->kibana

首先全局了解下文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
.
├── docker-compose.yml
├── kafka
│   ├── data
│   │   └── kafka-logs-e07760a64ec8
│   │   ├── cleaner-offset-checkpoint
│   │   ├── log-start-offset-checkpoint
│   │   ├── meta.properties
│   │   ├── recovery-point-offset-checkpoint
│   │   ├── replication-offset-checkpoint
│   │   └── test-0
│   │   ├── 00000000000000000000.index
│   │   ├── 00000000000000000000.log
│   │   ├── 00000000000000000000.timeindex
│   │   └── leader-epoch-checkpoint
│   └── docker.sock
├── kibana
│   └── kibana.yml
├── logstash
│   └── logstash.conf
└── zookeeper
├── data
└── datalog

docker-compose-elk.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
version: "3"
services:
elasticsearch:
image: elasticsearch:7.7.0
container_name: elasticsearch
hostname: elasticsearch
environment:
- "discovery.type=single-node"
ports:
- 9200:9200
- 9300:9300
networks:
- elknetwork
kibana:
image: kibana:7.7.0
container_name: kibana
hostname: kibana
ports:
- 5601:5601
volumes:
- ./kibana:/usr/share/kibana/config/
links:
- elasticsearch:elasticsearch
depends_on:
- elasticsearch
networks:
- elknetwork
logstash:
image: logstash:7.7.0
container_name: logstash
hostname: logstash
ports:
- 9600:9600
- 8089:8089
volumes:
- ./logstash:/usr/share/logstash/pipeline/
links:
- elasticsearch:elasticsearch
depends_on:
- elasticsearch
networks:
- elknetwork
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: unless-stopped
ports:
- "2181:2181"
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
networks:
- elknetwork
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: unless-stopped
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 # 用ifconfig查询,或直接填写kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9092 ## 修改:宿主机IP
volumes:
- "./kafka/docker.sock:/var/run/docker.sock"
- "./kafka/data/:/kafka"
depends_on:
- zookeeper
networks:
- elknetwork
networks:
elknetwork:
driver: bridge

注意: logstash没数据源时会主动停止进程, 勿慌

kibana.yml

1
2
3
4
5
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"

logstash.conf

1
2
3
4
5
6
7
8
9
input {
tcp {
port => 8089
}
}

output {
elasticsearch { hosts => ["elasticsearch:9200"] }
}

docker中logstash默认配置(/usr/share/logstash/config/pipelines.yml)

1
2
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"

docker中logstash默认配置(/usr/share/logstash/config/logstash.yml)

1
2
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]

熟悉docker中默认配置

1
2
3
4
5
6
# 进入docker容器
docker exec -it logstash /bin/bash
# 查看文件目录
ls
# 查看docker中配置文件
more config/logstash.yaml
  • logstash中 /usr/share/logstash/config/pipelines.yml 将配置文件指向 /usr/share/logstash/pipeline 目录下, 该目录下是我们真实的配置文件目录
  • kibana中 /usr/share/kibana/config/kibana.yml 可以修改为中文

log4j2通过tcp将日志输出到logstash

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
        <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.2</version>
</dependency> -->

log4j2-spring.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="OFF" monitorInterval="60">
<Appenders>
<!-- Console 日志,只输出 level 及以上级别的信息,并配置各级别日志输出颜色 -->
<Console name="Console" target="SYSTEM_OUT">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}"/>
</Console>
<!-- socket 日志,输出日志到 Logstash 中做日志收集 -->
<Socket name="Socket" host="127.0.0.1" port="8089" protocol="TCP">
<JsonLayout properties="true" compact="true" eventEol="true" />
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n"/>
</Socket>
</Appenders>
<Loggers>
<Root level="INFO">
<property name="hostName">cuishiying</property>
<property name="applicationName">elk-demo</property>
<appender-ref ref="Console"/>
<appender-ref ref="Socket"/>
</Root>
</Loggers>
</Configuration>

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
@RequestMapping("/test")
public class LogController {

private Logger logger = LogManager.getLogger(LogController.class);

// http://127.0.0.1:8080/test/log4j2
@RequestMapping(value = "/log4j2", method = RequestMethod.GET)
public String testLog(){
try {
MDC.put("traceid", String.valueOf(System.currentTimeMillis()));
logger.info("Hello 这是 info message. 信息");
logger.error("Hello 这是 error message. 报警");
logger.warn("Hello 这是 warn message. 警告");
logger.debug("Hello 这是 debug message. 调试");
List<String> list = new ArrayList<>();
System.out.println(list.get(2));
} catch (Exception e) {
logger.error("testLog", e);
} finally {
MDC.clear();
}
return "";
}
}

项目启动后,点击management可以看到:
logstash会按默认规则自动在ES中创建索引。我们需要手动创建kibana索引,
然后在discover中即可看到日志

用socket方式将日志传输给logstash,如果把logstash停掉然后再启动,日志就无法继续传输了,也就是说socket无法自动重连,这在生产环境中,当然是个隐患。所以生产环境一般会用Logstash-gelf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Properties>
<Property name="LOG_PATTERN">{"logger": "%logger", "level": "%level", "msg": "%message"}%n</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
<Gelf name="logstash-gelf" host="udp:127.0.0.1" port="4567" version="1.1" ignoreExceptions="true">
<Field name="timestamp" pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}" />
<Field name="logger" pattern="%logger" />
<Field name="level" pattern="%level" />
<Field name="simpleClassName" pattern="%C{1}" />
<Field name="className" pattern="%C" />
<Field name="server" pattern="%host" />
</Gelf>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="logstash-gelf" />
</Root>
</Loggers>
</Configuration>

log4j2将日志输出到kafka

log4j2-spring.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="OFF" monitorInterval="60">
<Properties>
<property name="log_pattern_console">%highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}</property>
<!-- 日志文件默认输出格式;%X{traceid}:链路id;%C:大写,类名;%M:方法名;%l:行号,影响性能;%m:信息;%n:换行 -->
<property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %l - %m%n</property>
</Properties>
<Appenders>
<!-- Console 日志,只输出 level 及以上级别的信息,并配置各级别日志输出颜色 -->
<Console name="Console" target="SYSTEM_OUT">
<!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)-->
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_pattern_console}"/>
</Console>
<!-- kafka 日志,输出日志到 Logstash 中做日志收集 -->
<Kafka name="Kafka" topic="test">
<PatternLayout pattern="${log_pattern}"/>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="INFO">
<property name="hostName">cuishiying</property>
<property name="app_name">elk-demo</property>
<appender-ref ref="Console"/>
<!--<appender-ref ref="Socket"/>-->
<AppenderRef ref="Kafka"/>
</Root>
</Loggers>
</Configuration>

此时,日志可以直接输出到kafka中

log4j2将日志通过kafka输出到elk中

修改logstash.conf, 将输入源设置为kafka, 输出设置为es, 以kafka的topic和日期创建es索引

logstash.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
input {
kafka{
bootstrap_servers => ["kafka:9092"]
topics_pattern => "test*"
group_id => "logstash-test-group"
consumer_threads => 5
codec => "json" # logstash-kafka 插件输入和输出默认codec为json格式
decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
auto_offset_reset => "latest"
type => "test" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
}
kafka{
bootstrap_servers => ["kafka:9092"]
topics_pattern => "ocpc*"
group_id => "logstash-ocpc-group"
consumer_threads => 5
codec => "json"
decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
auto_offset_reset => "latest"
type => "ocpc" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
}
}

filter {
json {
source => "message"
}
mutate{
remove_field => "@version"
}
}

output {
if[type] == "test" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][kafka][topic]}-%{app_name}-%{+YYYYMMdd}"

manage_template => false # 取消logstash自定义模板功能,进而强制使用es的内置模板
}
stdout {
codec => rubydebug {metadata => true} #logstash控制台输出日志和@metadata信息
}
}
if[type] == "ocpc" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][kafka][topic]}-%{+YYYYMMdd}"

manage_template => false # 取消logstash自定义模板功能,进而强制使用es的内置模板
}
stdout {
codec => rubydebug {metadata => true} #logstash控制台输出日志和@metadata信息
}
}
}

重启docker

1
docker-compose restart logstash

访问 http://localhost:5601, 可以看到日志已正常输出到ELK中, 包括异常栈。

日志链路追踪

上边的 MDC 放入traceid的操作可以通过过滤器统一放入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.easyliao.auth.common.filter;


import com.easyliao.auth.common.utils.IdUtils;
import com.easyliao.auth.common.utils.IpUtil;
import com.easyliao.auth.common.utils.RequestUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@Slf4j
@WebFilter(urlPatterns = "/*", filterName = "logFilter")
@Order(value = Ordered.HIGHEST_PRECEDENCE)
@Component
public class LogFilter extends OncePerRequestFilter {

private final String TRACE_ID = "traceid";
private final String IP = "ip";
private final String DEFAULT_TRACE_ID = "0";

@Override
protected void doFilterInternal(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, FilterChain filterChain) throws ServletException, IOException {
try {
// traceId初始化
initTraceId(httpServletRequest);
// 初始化ip
initIp(httpServletRequest);
// 入口信息
log(httpServletRequest, httpServletResponse);
// 执行后续过滤器
filterChain.doFilter(httpServletRequest,httpServletResponse);
} finally {
afterLog(httpServletRequest, httpServletResponse);
}
}

private void log(HttpServletRequest request, HttpServletResponse response){
if (!log.isInfoEnabled()) {
return;
}
log.info("\n请求地址: [{}] \n请求参数: [{}]",
request.getRequestURL().toString(),
RequestUtils.getFormParams(request)
);
}

private void afterLog(HttpServletRequest req, HttpServletResponse response) {
MDC.remove(TRACE_ID);
MDC.remove(IP);
}

private void initIp(HttpServletRequest servletRequest) {
MDC.put(IP, IpUtil.getIpAddr(servletRequest));
}

private void initTraceId(HttpServletRequest request) {
//尝试获取http请求中的traceId
String traceId = request.getParameter(TRACE_ID);

//如果当前traceId为空或者为默认traceId,则生成新的traceId
if (StringUtils.isBlank(traceId) || this.defaultTraceId(traceId)){
traceId = this.genTraceId();
}

//设置traceId
this.setTraceId(traceId);
}

public Boolean defaultTraceId(String traceId) {
return DEFAULT_TRACE_ID.equals(traceId);
}

public String genTraceId() {
return IdUtils.uuid();
}

public void setTraceId(String traceId) {
//如果参数为空,则设置默认traceId
traceId = StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId;
//将traceId放到MDC中
MDC.put(TRACE_ID, traceId);
}
public String getTraceId() {
//获取
String traceId = MDC.get(TRACE_ID);
//如果traceId为空,则返回默认值
return StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId;
}
}

logback+kafka+elk

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>elk-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elk-demo</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>


<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

logback-spring.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。 scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,
默认单位是毫秒当scan为true时,此属性生效。默认的时间间隔为1分钟。 debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。
默认值为false。 -->
<!-- <configuration scan="false" scanPeriod="60 seconds" debug="false"> -->
<configuration>

<!-- 日志文件默认输出格式;%X{traceid}:链路id;%C:大写,类名;%M:方法名;%l:行号,影响性能;%m:信息;%n:换行 -->
<property name="log_pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %L - %m%n"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log_pattern}</pattern>
</encoder>
</appender>

<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder>
<pattern>${log_pattern}</pattern>
</encoder>
<topic>test</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

<!-- Optional parameter to use a fixed partition -->
<!-- <partition>0</partition> -->

<!-- Optional parameter to include log timestamps into the kafka message -->
<!-- <appendTimestamp>true</appendTimestamp> -->

<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>

<!-- this is the fallback appender if kafka is not available. -->
<appender-ref ref="STDOUT" />
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
<appender-ref ref="kafkaAppender" />
</root>
</configuration>

application.properties

1
logging.config=classpath:logback-spring.xml

最后

本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。

参考