前言
我们知道 CSV 是一种非常流行的数据格式。在 Elastic Stack 中,我们有很多的方式来摄入 CSV 格式的数据。我们可以先看看一个常用的数据摄入数据流:
- 我们可以使用 Beats 所提供的 processors 来进行处理。
- 我们可以采用 Logstash 所提供的丰富过滤器来进行处理。
- 我们可以使用 Elasticsearch ingest 节点所提供的 processors 来进行处理。这个也是我们今天所需要讲的内容
- 我们还可以使用各类语言进行数据写入。
假设你有一个采用 CSV 格式的数据摄取流。 你可以选择开发一个例程,比如上面提到的 Python 应用来提取这些数据,但也许你可以通过使用 Elasticsearch 摄入节点所提供的 CSV 处理器来节省时间。
让我们配置 CSV 处理器来设置字段 “name”、“surname”、“address”、“phone”。 条目将是:“jhon bon;maclister;street wingol;552366636595”。
请注意,分隔符是 “;”,所以让我们将 “separator” 参数配置为值 “;”。
在 “target_fields” 字段中,我们将设置字段列表:“name”、“surname”、“address”、“phone”。出于教学目的,我不会使用其他参数,但如有必要,请访问文档。
我们的管道是这样的:
`
1. POST /_ingest/pipeline/_simulate?verbose=true
2. {
3. "pipeline": {
4. "description": "extract values from csv format",
5. "processors": [
6. {
7. "csv": {
8. "field": "csv_field",
9. "target_fields": [
10. "name", "surname", "address", "phone"
11. ],
12. "separator": ";"
13. }
14. }
15. ]
16. },
17. "docs": [
18. {
19. "_index": "index",
20. "_id": "id",
21. "_source": {
22. "csv_field": "jhon bon;maclister;street wingol;552366636595"
23. }
24. }
25. ]
26. }
`
上面的输出是:
`
1. {
2. "docs": [3. {4. "processor_results": [5. {6. "processor_type": "csv",7. "status": "success",8. "doc": {9. "_index": "index",10. "_id": "id",11. "_version": "-3",12. "_source": {13. "name": "jhon bon",14. "csv_field": "jhon bon;maclister;street wingol;552366636595",15. "address": "street wingol",16. "phone": "552366636595",17. "surname": "maclister"18. },19. "_ingest": {20. "pipeline": "_simulate_pipeline",21. "timestamp": "2023-02-03T03:17:44.015883051Z"22. }23. }24. }25. ]
26. }
27. ]
28. }
`
从输出中,我们可以看出来数据已经被结构化了。当然上面只是一个模拟的结果。我们需要使用如下的命令来创建一个处理 CSV 格式的 pipeline:
`
1. PUT /_ingest/pipeline/extract_csv
2. {
3. "description": "extract values from csv format",
4. "processors": [
5. {
6. "csv": {
7. "field": "csv_field",
8. "target_fields": [
9. "name",
10. "surname",
11. "address",
12. "phone"
13. ],
14. "separator": ";"
15. }
16. }
17. ]
18. }
`
这样我们就创建了一个叫做 extract_csv 的 pipeline。我们在实际写入文档时,可以这么使用这个 pipeline:
1. PUT twitter/_doc/1?pipeline=extract_csv
2. {
3. "csv_field": "jhon bon;maclister;street wingol;552366636595"
4. }
我们来查看该文档:
GET twitter/_doc/1
我们可以看到如下的结果:
`
1. {
2. "_index": "twitter",
3. "_id": "1",
4. "_version": 1,
5. "_seq_no": 0,
6. "_primary_term": 1,
7. "found": true,
8. "_source": {
9. "name": "jhon bon",
10. "csv_field": "jhon bon;maclister;street wingol;552366636595",
11. "address": "street wingol",
12. "phone": "552366636595",
13. "surname": "maclister"
14. }
15. }
`
很显然,文档已经被结构化了。
本文出至:学新通技术网
标签: