docker搭建ELK 
首先我们搭建ELK环境, 然后再引入缓存Kafka, 数据流为:
1 log-->logstash-agent-->kafka-->logstash-->es-->kibana 
 
首先全局了解下文件结构 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 . ├── docker-compose.yml ├── kafka │   ├── data │   │   └── kafka-logs-e07760a64ec8 │   │       ├── cleaner-offset-checkpoint │   │       ├── log-start-offset-checkpoint │   │       ├── meta.properties │   │       ├── recovery-point-offset-checkpoint │   │       ├── replication-offset-checkpoint │   │       └── test-0 │   │           ├── 00000000000000000000.index │   │           ├── 00000000000000000000.log │   │           ├── 00000000000000000000.timeindex │   │           └── leader-epoch-checkpoint │   └── docker.sock ├── kibana │   └── kibana.yml ├── logstash │   └── logstash.conf └── zookeeper     ├── data     └── datalog 
 
docker-compose-elk.yml 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 version:  "3" services:     elasticsearch:          image:  elasticsearch:7.7.0          container_name:  elasticsearch          hostname:  elasticsearch          environment:              -  "discovery.type=single-node"          ports:              -  9200 :9200              -  9300 :9300          networks:              -  elknetwork      kibana:          image:  kibana:7.7.0          container_name:  kibana          hostname:  kibana          ports:              -  5601 :5601          volumes:              -  ./kibana:/usr/share/kibana/config/          links:              -  elasticsearch:elasticsearch          depends_on:            -  elasticsearch          networks:            -  elknetwork      logstash:          image:  logstash:7.7.0          container_name:  logstash          hostname:  logstash          ports:              -  9600 :9600              -  8089 :8089          volumes:            -  ./logstash:/usr/share/logstash/pipeline/          links:              -  elasticsearch:elasticsearch          depends_on:            -  elasticsearch          networks:            -  elknetwork      zookeeper:          image:  wurstmeister/zookeeper          container_name:  zookeeper          restart:  unless-stopped          ports:            -  "2181:2181"          volumes:            -  ./zookeeper/data:/data            -  ./zookeeper/datalog:/datalog          networks:            -  elknetwork      kafka:          image:  wurstmeister/kafka          container_name:  kafka          restart:  unless-stopped          ports:            -  "9092:9092"          environment:            KAFKA_ADVERTISED_HOST_NAME:  192.168 .124 .5               KAFKA_ZOOKEEPER_CONNECT:  zookeeper:2181            KAFKA_CREATE_TOPICS:  "test:1:1"            KAFKA_ADVERTISED_LISTENERS:  PLAINTEXT://192.168.124.5:9092             volumes:            -  "./kafka/docker.sock:/var/run/docker.sock"            -  "./kafka/data/:/kafka"          depends_on:            -  zookeeper          networks:            -  elknetwork  networks:     elknetwork:          driver:  bridge  
 
注意: logstash没数据源时会主动停止进程, 勿慌
 
kibana.yml 
1 2 3 4 5 server.name:  kibana server.host:  "0" elasticsearch.hosts:  [ "http://elasticsearch:9200"  ]monitoring.ui.container.elasticsearch.enabled:  true i18n.locale:  "zh-CN" 
 
logstash.conf 
1 2 3 4 5 6 7 8 9 input {   tcp {     port => 8089   } } output {   elasticsearch { hosts => ["elasticsearch:9200"] } } 
 
docker中logstash默认配置(/usr/share/logstash/config/pipelines.yml) 
1 2 -  pipeline.id:  main   path.config:  "/usr/share/logstash/pipeline"  
 
docker中logstash默认配置(/usr/share/logstash/config/logstash.yml) 
1 2 http.host:  "0.0.0.0" xpack.monitoring.elasticsearch.hosts:  [ "http://elasticsearch:9200"  ]
 
熟悉docker中默认配置 
1 2 3 4 5 6 docker exec  -it logstash /bin/bash ls more config/logstash.yaml 
 
logstash中 /usr/share/logstash/config/pipelines.yml 将配置文件指向 /usr/share/logstash/pipeline 目录下, 该目录下是我们真实的配置文件目录 
kibana中 /usr/share/kibana/config/kibana.yml 可以修改为中文 
 
  log4j2通过tcp将日志输出到logstash 
pom.xml 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19         <dependency >              <groupId > org.springframework.boot</groupId >              <artifactId > spring-boot-starter-web</artifactId >              <exclusions >                  <exclusion >                      <groupId > org.springframework.boot</groupId >                      <artifactId > spring-boot-starter-logging</artifactId >                  </exclusion >              </exclusions >          </dependency >          <dependency >              <groupId > org.springframework.boot</groupId >              <artifactId > spring-boot-starter-log4j2</artifactId >          </dependency >  
 
log4j2-spring.xml 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <?xml version="1.0" encoding="UTF-8"?> <Configuration  status ="OFF"  monitorInterval ="60" >     <Appenders >                   <Console  name ="Console"  target ="SYSTEM_OUT" >                           <ThresholdFilter  level ="info"  onMatch ="ACCEPT"  onMismatch ="DENY" />              <PatternLayout  pattern ="%highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}" />          </Console >                   <Socket  name ="Socket"  host ="127.0.0.1"  port ="8089"  protocol ="TCP" >              <JsonLayout  properties ="true"  compact ="true"  eventEol ="true"  />              <PatternLayout  pattern ="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n" />          </Socket >      </Appenders >      <Loggers >          <Root  level ="INFO" >              <property  name ="hostName" > cuishiying</property >              <property  name ="applicationName" > elk-demo</property >              <appender-ref  ref ="Console" />              <appender-ref  ref ="Socket" />          </Root >      </Loggers >  </Configuration > 
 
测试 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @RestController @RequestMapping("/test") public  class  LogController   {    private  Logger logger = LogManager.getLogger(LogController.class);          @RequestMapping(value = "/log4j2", method = RequestMethod.GET)      public  String testLog ()  {         try  {             MDC.put("traceid" , String.valueOf(System.currentTimeMillis()));             logger.info("Hello 这是 info message. 信息" );             logger.error("Hello 这是 error message. 报警" );             logger.warn("Hello 这是 warn message. 警告" );             logger.debug("Hello 这是 debug message. 调试" );             List<String> list = new  ArrayList<>();             System.out.println(list.get(2 ));         } catch  (Exception e) {             logger.error("testLog" , e);         } finally  {             MDC.clear();         }         return  "" ;     } } 
 
项目启动后,点击management可以看到: 
logstash会按默认规则自动在ES中创建索引。我们需要手动创建kibana索引, 
然后在discover中即可看到日志
 
用socket方式将日志传输给logstash,如果把logstash停掉然后再启动,日志就无法继续传输了,也就是说socket无法自动重连,这在生产环境中,当然是个隐患。所以生产环境一般会用Logstash-gelf 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 <?xml version="1.0" encoding="UTF-8"?> <Configuration >     <Properties >          <Property  name ="LOG_PATTERN" > {"logger": "%logger", "level": "%level", "msg": "%message"}%n</Property >      </Properties >      <Appenders >          <Console  name ="Console"  target ="SYSTEM_OUT"  follow ="true" >              <PatternLayout  pattern ="${LOG_PATTERN}" />          </Console >          <Gelf  name ="logstash-gelf"  host ="udp:127.0.0.1"  port ="4567"  version ="1.1"  ignoreExceptions ="true" >              <Field  name ="timestamp"  pattern ="%d{yyyy-MM-dd HH:mm:ss.SSS}"  />              <Field  name ="logger"  pattern ="%logger"  />              <Field  name ="level"  pattern ="%level"  />              <Field  name ="simpleClassName"  pattern ="%C{1}"  />              <Field  name ="className"  pattern ="%C"  />              <Field  name ="server"  pattern ="%host"  />          </Gelf >      </Appenders >      <Loggers >          <Root  level ="INFO" >              <AppenderRef  ref ="Console" />              <AppenderRef  ref ="logstash-gelf"  />          </Root >      </Loggers >  </Configuration > 
 
  log4j2将日志输出到kafka 
log4j2-spring.xml 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <?xml version="1.0" encoding="UTF-8"?> <Configuration  status ="OFF"  monitorInterval ="60" >     <Properties >          <property  name ="log_pattern_console" > %highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}</property >                   <property  name ="log_pattern" > %d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %l - %m%n</property >      </Properties >      <Appenders >                   <Console  name ="Console"  target ="SYSTEM_OUT" >                           <ThresholdFilter  level ="info"  onMatch ="ACCEPT"  onMismatch ="DENY" />              <PatternLayout  pattern ="${log_pattern_console}" />          </Console >                   <Kafka  name ="Kafka"  topic ="test" >              <PatternLayout  pattern ="${log_pattern}" />              <Property  name ="bootstrap.servers" > localhost:9092</Property >          </Kafka >      </Appenders >      <Loggers >          <Root  level ="INFO" >              <property  name ="hostName" > cuishiying</property >              <property  name ="app_name" > elk-demo</property >              <appender-ref  ref ="Console" />                           <AppenderRef  ref ="Kafka" />          </Root >      </Loggers >  </Configuration > 
 
此时,日志可以直接输出到kafka中
  log4j2将日志通过kafka输出到elk中 
修改logstash.conf, 将输入源设置为kafka, 输出设置为es, 以kafka的topic和日期创建es索引
logstash.conf 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 input {     kafka{         bootstrap_servers => ["kafka:9092"]         topics_pattern  => "test*"         group_id => "logstash-test-group"         consumer_threads => 5         codec => "json"   # logstash-kafka 插件输入和输出默认codec为json格式         decorate_events => true  #此属性会将当前topic、offset、group、partition等信息也带到message中         auto_offset_reset => "latest"         type => "test"  #所有插件通用属性,尤其在input里面配置多个数据源时很有用     }     kafka{         bootstrap_servers => ["kafka:9092"]         topics_pattern  => "ocpc*"         group_id => "logstash-ocpc-group"         consumer_threads => 5         codec => "json"         decorate_events => true  #此属性会将当前topic、offset、group、partition等信息也带到message中         auto_offset_reset => "latest"         type => "ocpc"  #所有插件通用属性,尤其在input里面配置多个数据源时很有用     } } filter {    json {        source => "message"    }    mutate{        remove_field => "@version"    } } output {     if[type] == "test" {         elasticsearch {             hosts => ["elasticsearch:9200"]             index => "%{[@metadata][kafka][topic]}-%{app_name}-%{+YYYYMMdd}"             manage_template => false  # 取消logstash自定义模板功能,进而强制使用es的内置模板         }         stdout {             codec => rubydebug {metadata => true}  #logstash控制台输出日志和@metadata信息       }     }       if[type] == "ocpc" {         elasticsearch {             hosts => ["elasticsearch:9200"]             index => "%{[@metadata][kafka][topic]}-%{+YYYYMMdd}"             manage_template => false  # 取消logstash自定义模板功能,进而强制使用es的内置模板         }         stdout {             codec => rubydebug {metadata => true}  #logstash控制台输出日志和@metadata信息       }     }  } 
 
重启docker
1 docker-compose restart logstash 
 
访问 http://localhost:5601 , 可以看到日志已正常输出到ELK中, 包括异常栈。
  日志链路追踪 
上边的 MDC 放入traceid的操作可以通过过滤器统一放入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 package  com.easyliao.auth.common.filter;  import  com.easyliao.auth.common.utils.IdUtils;import  com.easyliao.auth.common.utils.IpUtil;import  com.easyliao.auth.common.utils.RequestUtils;import  lombok.extern.slf4j.Slf4j;import  org.apache.commons.lang3.StringUtils;import  org.slf4j.MDC;import  org.springframework.core.Ordered;import  org.springframework.core.annotation.Order;import  org.springframework.stereotype.Component;import  org.springframework.web.filter.OncePerRequestFilter; import  javax.servlet.FilterChain;import  javax.servlet.ServletException;import  javax.servlet.annotation.WebFilter;import  javax.servlet.http.HttpServletRequest;import  javax.servlet.http.HttpServletResponse;import  java.io.IOException; @Slf4j @WebFilter(urlPatterns = "/*", filterName = "logFilter") @Order(value = Ordered.HIGHEST_PRECEDENCE) @Component public  class  LogFilter  extends  OncePerRequestFilter   {     private  final  String TRACE_ID = "traceid" ;     private  final  String IP = "ip" ;     private  final  String DEFAULT_TRACE_ID = "0" ;      @Override      protected  void  doFilterInternal (HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, FilterChain filterChain)  throws  ServletException, IOException  {         try  {                          initTraceId(httpServletRequest);                          initIp(httpServletRequest);                          log(httpServletRequest, httpServletResponse);                          filterChain.doFilter(httpServletRequest,httpServletResponse);         } finally  {             afterLog(httpServletRequest, httpServletResponse);         }     }      private  void  log (HttpServletRequest request, HttpServletResponse response)  {         if  (!log.isInfoEnabled()) {             return ;         }         log.info("\n请求地址: [{}] \n请求参数: [{}]" ,                 request.getRequestURL().toString(),                 RequestUtils.getFormParams(request)         );     }      private  void  afterLog (HttpServletRequest req, HttpServletResponse response)   {         MDC.remove(TRACE_ID);         MDC.remove(IP);     }      private  void  initIp (HttpServletRequest servletRequest)   {         MDC.put(IP, IpUtil.getIpAddr(servletRequest));     }      private  void  initTraceId (HttpServletRequest request)   {                  String traceId = request.getParameter(TRACE_ID);                   if  (StringUtils.isBlank(traceId) || this .defaultTraceId(traceId)){             traceId = this .genTraceId();         }                   this .setTraceId(traceId);     }      public  Boolean defaultTraceId (String traceId)   {         return  DEFAULT_TRACE_ID.equals(traceId);     }      public  String genTraceId ()   {         return  IdUtils.uuid();     }      public  void  setTraceId (String traceId)   {                  traceId = StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId;                  MDC.put(TRACE_ID, traceId);     }     public  String getTraceId ()   {                  String traceId = MDC.get(TRACE_ID);                  return  StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId;     } } 
 
  logback+kafka+elk 
pom.xml 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 <?xml version="1.0" encoding="UTF-8"?> <project  xmlns ="http://maven.apache.org/POM/4.0.0"  xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"           xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" >     <modelVersion > 4.0.0</modelVersion >      <parent >          <groupId > org.springframework.boot</groupId >          <artifactId > spring-boot-starter-parent</artifactId >          <version > 2.3.4.RELEASE</version >          <relativePath />       </parent >      <groupId > com.example</groupId >      <artifactId > elk-demo</artifactId >      <version > 0.0.1-SNAPSHOT</version >      <name > elk-demo</name >      <description > Demo project for Spring Boot</description >      <properties >          <java.version > 1.8</java.version >      </properties >      <dependencies >          <dependency >              <groupId > org.springframework.boot</groupId >              <artifactId > spring-boot-starter-web</artifactId >          </dependency >          <dependency >              <groupId > org.springframework.boot</groupId >              <artifactId > spring-boot-devtools</artifactId >              <scope > runtime</scope >              <optional > true</optional >          </dependency >          <dependency >              <groupId > org.projectlombok</groupId >              <artifactId > lombok</artifactId >              <optional > true</optional >          </dependency >          <dependency >              <groupId > org.springframework.boot</groupId >              <artifactId > spring-boot-starter-test</artifactId >              <scope > test</scope >              <exclusions >                  <exclusion >                      <groupId > org.junit.vintage</groupId >                      <artifactId > junit-vintage-engine</artifactId >                  </exclusion >              </exclusions >          </dependency >          <dependency >              <groupId > com.github.danielwegener</groupId >              <artifactId > logback-kafka-appender</artifactId >              <version > 0.2.0-RC2</version >          </dependency >      </dependencies >      <build >          <plugins >              <plugin >                  <groupId > org.apache.maven.plugins</groupId >                  <artifactId > maven-compiler-plugin</artifactId >                  <version > 3.8.1</version >                  <configuration >                      <source > 1.8</source >                      <target > 1.8</target >                      <encoding > UTF-8</encoding >                  </configuration >              </plugin >              <plugin >                  <groupId > org.springframework.boot</groupId >                  <artifactId > spring-boot-maven-plugin</artifactId >              </plugin >          </plugins >      </build >  </project > 
 
logback-spring.xml 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 <?xml version="1.0" encoding="UTF-8"?> <configuration >          <property  name ="log_pattern"  value ="%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %L - %m%n" />      <appender  name ="STDOUT"  class ="ch.qos.logback.core.ConsoleAppender" >          <encoder >              <pattern > ${log_pattern}</pattern >          </encoder >      </appender >           <appender  name ="kafkaAppender"  class ="com.github.danielwegener.logback.kafka.KafkaAppender" >          <encoder >              <pattern > ${log_pattern}</pattern >          </encoder >          <topic > test</topic >          <keyingStrategy  class ="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"  />          <deliveryStrategy  class ="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"  />                                                                         <producerConfig > bootstrap.servers=localhost:9092</producerConfig >                   <appender-ref  ref ="STDOUT"  />      </appender >      <root  level ="info" >          <appender-ref  ref ="STDOUT"  />          <appender-ref  ref ="kafkaAppender"  />      </root >  </configuration > 
 
application.properties 
1 logging.config =classpath:logback-spring.xml 
 
  最后 
本文到此结束,感谢阅读。如果您觉得不错,请关注公众号【当我遇上你】,您的支持是我写作的最大动力。
  参考