• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Logstash:在实施之前测试 Logstash 管道/过滤器

武飞扬头像
Elasticsearch
帮助43

前言

检测解析的日志是否包含单个或多个警告消息,然后添加一个字段来说明这两种情况。在很多的情形下,我们在测试 Logstash 的过滤器时,并不急于把实际的 input 的数据接入到过滤器中来进行测试。我们首先来选择一个比较容易理解的 input 方式,使用一个文档来进行解析,并测试管道。在今天的文章中,我来详细介绍两种常用的方法来如何测试 Logstash 的管道/过滤器。

方法一:使用 generator

方法如下:

logstash.conf


1.  input {
2.    generator {
3.      message => '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
4.      count => 1
5.    }
6.  }

8.  filter {
9.      json {
10.          source => "message"
11.      }

13.      if [paymentType] == "Mastercard" {
14.          drop {}
15.      }

17.      mutate {
18.          remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
19.      }

21.  }

23.  output {
24.    stdout {
25.      codec => rubydebug
26.    }
27.  }


在上面,我们使用 generator 的方法来生成一个文档,并让这个文档经过 filter 部分,并最终在 console 中进行展示。我们可以通过如下的命令来运行上面的 Logstash 管道:


1.  $ pwd
2.  /Users/liuxg/elastic/logstash-8.6.1
3.  $ ls logstash.conf
4.  logstash.conf
5.  $ ./bin/logstash -f logstash.conf


   

>

>从上面,我们可以看出来 json filter 工作正常。在本示例中,为了说明问题的方便,我仅使用了几个过滤器。在时间的使用中,我们可以有很多的过滤器来组成这个 pipeline。一旦我们确定了这些过滤器能完成我们所需要的功能,我们可以把所需要的 input 换进来即可,比如:

logstash_filter.conf


1.  input {
2.    file {
3.      path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4.      type    => "applog"
5.      start_position => "beginning"
6.      sincedb_path => "/dev/null"
7.    }
8.  }

10.  filter {
11.      json {
12.          source => "message"
13.      }

15.      if [paymentType] == "Mastercard" {
16.          drop {}
17.      }

19.      mutate {
20.          remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
21.      }

23.  }

25.  output {
26.  	stdout { 
27.  	  codec => rubydebug 
28.  	}
29.  }


我们可以使用诸如如下格式的测试文件来进行测试:

sample.json


1.  {"id":1,"timestamp":"2019-09-12T13:43:42Z","paymentType":"Amex","name":"Merrill Duffield","gender":"Female","ip_address":"132.150.218.21","purpose":"Toys","country":"United Arab Emirates","age":33}
2.  {"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}
3.  {"id":3,"timestamp":"2019-07-14T04:48:25Z","paymentType":"Visa","name":"Harri Cayette","gender":"Female","ip_address":"227.6.210.146","purpose":"Sports","country":"Canada","age":27}
4.  {"id":4,"timestamp":"2020-02-29T12:41:59Z","paymentType":"Mastercard","name":"Regan Stockman","gender":"Male","ip_address":"139.224.15.154","purpose":"Home","country":"Indonesia","age":34}
5.  {"id":5,"timestamp":"2019-08-03T19:37:51Z","paymentType":"Mastercard","name":"Wilhelmina Polle","gender":"Female","ip_address":"252.254.68.68","purpose":"Health","country":"Ukraine","age":51}


当然实际的文档可能比这个要长很多。

更多关于 generator 方面的示例,请阅读我之前的文章 “Logstash:Data 转换,分析,提取,丰富及核心操作”。

方法二:使用 stdin input

假设我们有以下代表上述两种情况的日志文件:


1.  $ pwd
2.  /Users/liuxg/elastic/logstash-8.6.1
3.  $ cat multivaluewarn.json 
4.  {"waf": {"ver": "2.0","warnRules": "3000030;3000057;950001;950109;959073;973335;981173;981244;981318","denyMsg": "Anomaly Score Exceeded for SQL Injection","denyActions": "3","warnMsg": "Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"}} 
5.  $ 
6.  $ cat singlevaluewarn.json 
7.  {"waf": {"ver": "2.0","warnRules": "681984","policy": "api_89894","warnMsg": "Alert rq without DEVICEID header","warnTags": "DEVICEID_Detection","warnActions": "2"}}


multivaluewarn.json

{"waf": {"ver": "2.0","warnRules": "3000030;3000057;950001;950109;959073;973335;981173;981244;981318","denyMsg": "Anomaly Score Exceeded for SQL Injection","denyActions": "3","warnMsg": "Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"}} 

1.  {
2.     "waf":{
3.        "ver":"2.0",
4.        "warnRules":"3000030;3000057;950001;950109;959073;973335;981173;981244;981318",
5.        "denyMsg":"Anomaly Score Exceeded for SQL Injection",
6.        "denyActions":"3",
7.        "warnMsg":"Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"
8.     }
9.  }


singlevaluewarn.json

{"waf": {"ver": "2.0","warnRules": "681984","policy": "api_89894","warnMsg": "Alert rq without DEVICEID header","warnTags": "DEVICEID_Detection","warnActions": "2"}}

1.  {
2.     "waf":{
3.        "ver":"2.0",
4.        "warnRules":"681984",
5.        "policy":"api_89894",
6.        "warnMsg":"Alert rq without DEVICEID header",
7.        "warnTags":"DEVICEID_Detection",
8.        "warnActions":"2"
9.     }
10.  }


阅读日志我们可以看到字段 [waf][warnMsg] 使用分号分隔警告消息; 在多次警告的情况下。
将收集到的信息转换为 Logstash 管道将导致:

logstash_warning.conf


1.  input {
2.    stdin { codec => json }
3.  }

5.  filter {
6.    if ";" in [waf][warnMsg]{
7.      mutate {
8.        add_field =>  [ "wafWarningMSG", "multi warnings" ]
9.      }
10.    }
11.    else {
12.      mutate {
13.        add_field =>  [ "wafWarningMSG", "single" ]
14.      }
15.    }
16.  }

18.  output {
19.    stdout {
20.      codec => rubydebug
21.    }
22.  }


将管道添加到 conf 文件(称为 logstash_warning.conf ),然后使用如下的命令来测试 pipeline:


1.  $ pwd
2.  /Users/liuxg/elastic/logstash-8.6.1
3.  $ ls logstash_warning.conf 
4.  logstash_warning.conf
5.  $ ./bin/logstash -f logstash_warning.conf < multivaluewarn.json


> 输出显示一个名为 wafWarningMSG 的新字段,其中包含 "multi warnings":

>当然,我们也可以使用如下的命令来进行测试:

./bin/logstash -f logstash_warning.conf < singlevaluewarn.json 

 

>从上面的输出中,我们可以看到 wafWarningMSG 字段的值为 single。

一旦我们测试好 pipeline 中的过滤器,我们就可以直接把 input 部分换成我们想要的格式即可。

希望你觉得它有用,如有任何问题,请随时联系我!

这篇文章转载于:学新通

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通
  • 本文地址: https://www.swvq.com/boutique/detail/tanfbfej
  • 联系方式: luke.wu●vfv.cc
系列文章
更多 icon
同类精品
更多 icon
我要评论
我的头像
精彩评论
继续加载