前言
检测解析的日志是否包含单个或多个警告消息,然后添加一个字段来说明这两种情况。在很多的情形下,我们在测试 Logstash 的过滤器时,并不急于把实际的 input 的数据接入到过滤器中来进行测试。我们首先来选择一个比较容易理解的 input 方式,使用一个文档来进行解析,并测试管道。在今天的文章中,我来详细介绍两种常用的方法来如何测试 Logstash 的管道/过滤器。
方法一:使用 generator
方法如下:
logstash.conf
1. input {
2. generator {
3. message => '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
4. count => 1
5. }
6. }
8. filter {
9. json {
10. source => "message"
11. }
13. if [paymentType] == "Mastercard" {
14. drop {}
15. }
17. mutate {
18. remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
19. }
21. }
23. output {
24. stdout {
25. codec => rubydebug
26. }
27. }
在上面,我们使用 generator 的方法来生成一个文档,并让这个文档经过 filter 部分,并最终在 console 中进行展示。我们可以通过如下的命令来运行上面的 Logstash 管道:
1. $ pwd
2. /Users/liuxg/elastic/logstash-8.6.1
3. $ ls logstash.conf
4. logstash.conf
5. $ ./bin/logstash -f logstash.conf

logstash_filter.conf
1. input {
2. file {
3. path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4. type => "applog"
5. start_position => "beginning"
6. sincedb_path => "/dev/null"
7. }
8. }
10. filter {
11. json {
12. source => "message"
13. }
15. if [paymentType] == "Mastercard" {
16. drop {}
17. }
19. mutate {
20. remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
21. }
23. }
25. output {
26. stdout {
27. codec => rubydebug
28. }
29. }
我们可以使用诸如如下格式的测试文件来进行测试:
sample.json
1. {"id":1,"timestamp":"2019-09-12T13:43:42Z","paymentType":"Amex","name":"Merrill Duffield","gender":"Female","ip_address":"132.150.218.21","purpose":"Toys","country":"United Arab Emirates","age":33}
2. {"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}
3. {"id":3,"timestamp":"2019-07-14T04:48:25Z","paymentType":"Visa","name":"Harri Cayette","gender":"Female","ip_address":"227.6.210.146","purpose":"Sports","country":"Canada","age":27}
4. {"id":4,"timestamp":"2020-02-29T12:41:59Z","paymentType":"Mastercard","name":"Regan Stockman","gender":"Male","ip_address":"139.224.15.154","purpose":"Home","country":"Indonesia","age":34}
5. {"id":5,"timestamp":"2019-08-03T19:37:51Z","paymentType":"Mastercard","name":"Wilhelmina Polle","gender":"Female","ip_address":"252.254.68.68","purpose":"Health","country":"Ukraine","age":51}
当然实际的文档可能比这个要长很多。
更多关于 generator 方面的示例,请阅读我之前的文章 “Logstash:Data 转换,分析,提取,丰富及核心操作”。
方法二:使用 stdin input
假设我们有以下代表上述两种情况的日志文件:
1. $ pwd
2. /Users/liuxg/elastic/logstash-8.6.1
3. $ cat multivaluewarn.json
4. {"waf": {"ver": "2.0","warnRules": "3000030;3000057;950001;950109;959073;973335;981173;981244;981318","denyMsg": "Anomaly Score Exceeded for SQL Injection","denyActions": "3","warnMsg": "Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"}}
5. $
6. $ cat singlevaluewarn.json
7. {"waf": {"ver": "2.0","warnRules": "681984","policy": "api_89894","warnMsg": "Alert rq without DEVICEID header","warnTags": "DEVICEID_Detection","warnActions": "2"}}
multivaluewarn.json
{"waf": {"ver": "2.0","warnRules": "3000030;3000057;950001;950109;959073;973335;981173;981244;981318","denyMsg": "Anomaly Score Exceeded for SQL Injection","denyActions": "3","warnMsg": "Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"}}
1. {
2. "waf":{
3. "ver":"2.0",
4. "warnRules":"3000030;3000057;950001;950109;959073;973335;981173;981244;981318",
5. "denyMsg":"Anomaly Score Exceeded for SQL Injection",
6. "denyActions":"3",
7. "warnMsg":"Basic SQL Authentication Bypass Attempts 3/3;Cross-site Scripting (XSS) common keywords;SQL Injection Attack;Multiple URL Encoding Detected;SQL Injection Attack;IE XSS Filters - Attack Detected;Restricted SQL Character Anomaly Detection Alert - Total # of special characters exceeded;Basic SQL Authentication Bypass Attempts 1/3;SQL Injection Attack: Common Injection Testing Detected"
8. }
9. }
singlevaluewarn.json
{"waf": {"ver": "2.0","warnRules": "681984","policy": "api_89894","warnMsg": "Alert rq without DEVICEID header","warnTags": "DEVICEID_Detection","warnActions": "2"}}
1. {
2. "waf":{
3. "ver":"2.0",
4. "warnRules":"681984",
5. "policy":"api_89894",
6. "warnMsg":"Alert rq without DEVICEID header",
7. "warnTags":"DEVICEID_Detection",
8. "warnActions":"2"
9. }
10. }
阅读日志我们可以看到字段 [waf][warnMsg] 使用分号分隔警告消息; 在多次警告的情况下。
将收集到的信息转换为 Logstash 管道将导致:
logstash_warning.conf
1. input {
2. stdin { codec => json }
3. }
5. filter {
6. if ";" in [waf][warnMsg]{
7. mutate {
8. add_field => [ "wafWarningMSG", "multi warnings" ]
9. }
10. }
11. else {
12. mutate {
13. add_field => [ "wafWarningMSG", "single" ]
14. }
15. }
16. }
18. output {
19. stdout {
20. codec => rubydebug
21. }
22. }
将管道添加到 conf 文件(称为 logstash_warning.conf ),然后使用如下的命令来测试 pipeline:
1. $ pwd
2. /Users/liuxg/elastic/logstash-8.6.1
3. $ ls logstash_warning.conf
4. logstash_warning.conf
5. $ ./bin/logstash -f logstash_warning.conf < multivaluewarn.json
./bin/logstash -f logstash_warning.conf < singlevaluewarn.json
一旦我们测试好 pipeline 中的过滤器,我们就可以直接把 input 部分换成我们想要的格式即可。
希望你觉得它有用,如有任何问题,请随时联系我!
本文出至:学新通技术网
标签: