• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Logstash:如何使用 Logstash 解析并摄入 JSON 数据到 Elasticsearch

武飞扬头像
Elasticsearch
帮助43

准备数据

我们先来把如下的数据拷贝下来,并保存到一个叫做 sample.json 的文件中。我们可以把这个文件置于 Logstash 的安装根目录下。

sample.json



1.  {"id":1,"timestamp":"2019-09-12T13:43:42Z","paymentType":"Amex","name":"Merrill Duffield","gender":"Female","ip_address":"132.150.218.21","purpose":"Toys","country":"United Arab Emirates","age":33}
2.  {"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}
3.  {"id":3,"timestamp":"2019-07-14T04:48:25Z","paymentType":"Visa","name":"Harri Cayette","gender":"Female","ip_address":"227.6.210.146","purpose":"Sports","country":"Canada","age":27}
4.  {"id":4,"timestamp":"2020-02-29T12:41:59Z","paymentType":"Mastercard","name":"Regan Stockman","gender":"Male","ip_address":"139.224.15.154","purpose":"Home","country":"Indonesia","age":34}
5.  {"id":5,"timestamp":"2019-08-03T19:37:51Z","paymentType":"Mastercard","name":"Wilhelmina Polle","gender":"Female","ip_address":"252.254.68.68","purpose":"Health","country":"Ukraine","age":51}




1.  $ pwd
2.  /Users/liuxg/elastic/logstash-8.6.1
3.  $ ls sample.json 
4.  sample.json


解析及过滤 JSON 文件

我们有如下的几种方法:

使用 Logstash 的 Input JSON codec

我们创建如下的 Logstash 配置文件:

logstash_input.conf

`

1.  input {
2.    file {
3.      path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4.      type    => "applog"
5.      codec   => "json"
6.      start_position => "beginning"
7.      sincedb_path => "/dev/null"
8.    }
9.  }

11.  output {
12.  	stdout { 
13.  	  codec => rubydebug 
14.  	}
15.  }

`![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)

我们运行 Logstash:



1.  $ pwd
2.  /Users/liuxg/elastic/logstash-8.6.1
3.  $ ./bin/logstash -f logstash_input.conf


在运行的 terminal 中,我们可以看到如下的结果:

>从上面,我们可以看出来我们的数据已经变为结构化的数据了。

使用 JSON filter

我们创建如下的一个 Logstash 配置文件:

logstash_filter.conf

`

1.  input {
2.    file {
3.      path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4.      type    => "applog"
5.      start_position => "beginning"
6.      sincedb_path => "/dev/null"
7.    }
8.  }

10.  filter {
11.      json {
12.          source => "message"
13.      }
14.  }

16.  output {
17.  	stdout { 
18.  	  codec => rubydebug 
19.  	}
20.  }

`![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)

我们运行 Logstash:



1.  $ pwd
2.  /Users/liuxg/elastic/logstash-8.6.1
3.  $ ./bin/logstash -f logstash_filter.conf


在运行的 terminal 中,我们可以看到如下的结果:

>从上面我们可以看出来 JSON 文件也被正确地结构化了。

接下来,我们来清理一下我们的数据,并过滤掉那些  paymentType 为 Mastercard 的文档。我们进一步修改配置文件:

logstash_filter.conf

`

1.  input {
2.    file {
3.      path => "/Users/liuxg/elastic/logstash-8.6.1/sample.json"
4.      type    => "applog"
5.      start_position => "beginning"
6.      sincedb_path => "/dev/null"
7.    }
8.  }

10.  filter {
11.      json {
12.          source => "message"
13.      }

15.      if [paymentType] == "Mastercard" {
16.          drop {}
17.      }

19.      mutate {
20.          remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
21.      }

23.  }

25.  output {
26.  	stdout { 
27.  	  codec => rubydebug 
28.  	}
29.  }

`![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)

在上面,我们过滤掉 paymentType 为 Mastercard 的文档,同时,我们也去除一些不需要的字段,比如 message。我们再次运行:

>很显然,我们这次没有看到 message 字段,同时 paymentType 为 Mastercard 的文档都被过滤掉了。

处理含有数值的 JSON 数据

接下来我们使用另外一组数据。在这个数据里,它的 JSON 文件里含有一个数组:

sample-split.json



1.  {"id":1,"timestamp":"2019-06-19T23:04:47Z","paymentType":"Mastercard","name":"Ardis Shimuk","gender":"Female","ip_address":"91.33.132.38","purpose":"Home","country":"France","pastEvents":[{"eventId":1,"transactionId":"trx14224"},{"eventId":2,"transactionId":"trx23424"}],"age":34}
2.  {"id":2,"timestamp":"2019-11-26T15:40:56Z","paymentType":"Amex","name":"Benoit Urridge","gender":"Male","ip_address":"26.71.230.228","purpose":"Shoes","country":"Brazil","pastEvents":[{"eventId":3,"transactionId":"63323-064"},{"eventId":4,"transactionId":"0378-3120"}],"age":51}
3.  {"id":3,"timestamp":"2019-05-08T16:24:25Z","paymentType":"Visa","name":"Lindsy Ketchell","gender":"Female","ip_address":"189.216.71.184","purpose":"Home","country":"Brazil","pastEvents":[{"eventId":5,"transactionId":"68151-3826"},{"eventId":6,"transactionId":"52125-611"}],"age":26}
4.  {"id":4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"223.113.73.232","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
5.  {"id":5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"159.148.102.98","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}


请注意上面最后面的一个空行。这样可以保证所有的文档被摄入。如上所示,它含有一个叫做 pastEvents 的字段。它是一个数组。在每个文档中,它含有 1 个或多个 eventId。我们可以通过 split 过滤器来把这些 eventId 变成单个的事件。

我们创建如下的一个 Logstash 配置文件:

logstash_split.conf

`

1.  input {
2.    file {
3.      path => "/Users/liuxg/elastic/logstash-8.6.1/sample-split.json"
4.      type    => "applog"
5.      start_position => "beginning"
6.      sincedb_path => "/dev/null"
7.    }
8.  }

10.  filter {
11.      json {
12.          source => "message"
13.      }

15.      split {
16.          field => "[pastEvents]"
17.      }

19.      mutate {
20.          remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
21.      }

23.  }

25.  output {
26.  	stdout { 
27.  	  codec => rubydebug 
28.  	}
29.  }

`![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)

我们可以参考官方文档 Split filter plugin | Logstash Reference [8.6] | Elastic 来了解更多关于 split 过滤的功能。

我们运行上面的 pipeline:

./bin/logstash -f logstash_split.conf

>我们可以看到在之前的源 JSON 文档中,它共有5个文档,但是经过 split 过滤器后,它现在变为 10 个文档了。

输出到 Elasticsearch

经过上面的 input 及 filter,我们得到了我们想要的结构化的数据。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanfbffb
系列文章
更多 icon
同类精品
更多 icon
继续加载