• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

使用 Elasticsearch 摄取管道优化数据

武飞扬头像
Elasticsearch
帮助94

在本文中,我想快速了解 Elasticsearch 提供的一个(众多)有趣的功能,我也倾向于在生产设置中使用它,即 Ingest Pipelines。 此功能允许在实际文档索引发生之前预处理文档。 听起来很有趣,为什么……

这在很多方面都很有用,但我可以看到两个主要原因。 首先,当你处理(大)数据分析/处理方面时,你必须在原始数据到来时对其进行处理,并对它们进行预处理以满足你的数据需求。 其次,即使你打算更换生产商,你 A) 可能无法直接控制所有 producers(例如,由于某些组织方面,它们的遗留性质,例如)和/或 B) 仍然需要做好准备,以应对可能发生的变化相当长的一段时间(例如,考虑要在共享日志库的新版本上升级数十/数百个微服务等)。

对于这些情况,你可以考虑使用 Elastic 所提供的 ingest pipeline 来帮助处理数据。

Ingest pipeline 是如何工作的?

Ingest pipeline,也即摄取管道, 是 Logstash 长期占据的数据解析和转换领域的新成员。 Ingest 管道实际上并不能完全替代 Logstash。 但是 Logstash 的考虑是它不可避免地会为你的体系结构带来另一个软件组件,从而使其操作起来更加复杂。 摄取管道不是这种情况,因为它们由集群中的任何(或所有)常规 Elasticsearch 节点直接执行。 该节点只需要是 Ingest 节点的类型(默认情况下),因此你甚至不需要在开始使用它们时弄乱配置。

在索引操作期间,协调器节点收到请求后,就会立即在摄取节点上执行管道。 图片来源: elastic.co

Pipeline 定义

通常,摄取管道是通过一个简单的 JSON 文档定义的,该文档包含一组处理器,这些处理器代表一组有序的步骤,这些步骤应用于所有传入文档并执行。 实际的处理器有多种类型,因此我强烈建议你浏览文档中的列表以熟悉它们。

处理器有一些共同点。 这些是:

  • 使用大括号模板 {{service-name}} 访问/引用定义中已处理文档的数据的能力
  • 能够使用 if 子句定义处理器的条件执行,允许仅在检查条件后执行步骤
  • 使用 on_failure 子句处理处理器故障的能力
  • 可以通过 tag 子句进行标记,这很有用,例如。 用于错误跟踪

测试环境启动

我们有了足够的理论,让我们在本地机器上的 Docker 中启动一个简单的(单节点)集群,并尝试第一个管道定义。 注意:如果你需要有关 docker 部分的更多信息,请在我之前的文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 中找到它。 你也可以选择其他的方式来部署 Elasticsearch 及 Kibana。请参阅如下的文章:

使用摄取管道来处理文档

我们首先想创建一个如下功能的 pipeline:

在 Kibana 中,我们使用如下的命令来进行模拟:



1.  POST _ingest/pipeline/_simulate
2.  {
3.    "pipeline": {
4.      "description": "User split, foreach and upper processors",
5.      "processors": [
6.        {
7.          "split": {
8.            "field": "meta.tags",
9.            "separator": " ",
10.            "target_field": "meta.tags_parsed",
11.            "ignore_missing": true
12.          }
13.        },
14.        {
15.          "foreach": {
16.            "field": "meta.tags_parsed",
17.            "processor": {
18.              "uppercase": {
19.                "field": "_ingest._value"
20.              }
21.            }
22.          }
23.        }
24.      ]
25.    },
26.    "docs": [
27.      {
28.        "_source": {
29.          "meta": {
30.            "tags": "good enjoyed recommended"
31.          }
32.        }
33.      }
34.    ]
35.  }


上面的命令的响应为:



1.  {
2.    "docs": [
3.      {
4.        "doc": {
5.          "_index": "_index",
6.          "_id": "_id",
7.          "_version": "-3",
8.          "_source": {
9.            "meta": {
10.              "tags_parsed": [
11.                "GOOD",
12.                "ENJOYED",
13.                "RECOMMENDED"
14.              ],
15.              "tags": "good enjoyed recommended"
16.            }
17.          },
18.          "_ingest": {
19.            "_value": null,
20.            "timestamp": "2023-03-07T03:29:46.844735427Z"
21.          }
22.        }
23.      }
24.    ]
25.  }


从输出中,我们可以看到,meta.tags 以空格为分隔符进行拆分,之后再经过 foreach 处理器,分别对每个拆分后的词进行大写。一旦我们使用上面的 simulate 端点测试好以后,我们就可以使用如下的命令来进行定义:



1.  PUT _ingest/pipeline/my_simple_pipeline
2.  {
3.    "processors": [
4.      {
5.        "split": {
6.          "field": "meta.tags",
7.          "separator": " ",
8.          "target_field": "meta.tags_parsed",
9.          "ignore_missing": true
10.        }
11.      },
12.      {
13.        "foreach": {
14.          "field": "meta.tags_parsed",
15.          "processor": {
16.            "uppercase": {
17.              "field": "_ingest._value"
18.            }
19.          }
20.        }
21.      }
22.    ]
23.  }


在上面,我们定义了一个叫做 my_simple_pipeline 的摄取管道。现在让我们创建一个索引,该索引将使用此管道作为其默认入口管道。



1.  PUT my_index
2.  {
3.    "settings": {
4.      "index.default_pipeline": "my_simple_pipeline"
5.    }
6.  }


最后让我们索引一个将由该管道处理的虚拟文档:



1.  POST my_index/_doc
2.  {
3.    "meta": {
4.      "tags": "good enjoyed recommended"
5.    }
6.  }


我们可以对 my_index 进行检索:

GET my_index/_search

我们可以看到如下的内容:



1.  {
2.    "took": 222,
3.    "timed_out": false,
4.    "_shards": {
5.      "total": 1,
6.      "successful": 1,
7.      "skipped": 0,
8.      "failed": 0
9.    },
10.    "hits": {
11.      "total": {
12.        "value": 1,
13.        "relation": "eq"
14.      },
15.      "max_score": 1,
16.      "hits": [
17.        {
18.          "_index": "my_index",
19.          "_id": "vc4muoYB6XeJoCxQWgC_",
20.          "_score": 1,
21.          "_source": {
22.            "meta": {
23.              "tags_parsed": [
24.                "GOOD",
25.                "ENJOYED",
26.                "RECOMMENDED"
27.              ],
28.              "tags": "good enjoyed recommended"
29.            }
30.          }
31.        }
32.      ]
33.    }
34.  }


基于值的脚本和条件执行

现在让我们尝试两件事,为了有效,让我们在一个处理器中同时使用它们 :)

  • 使用 if 子句的条件执行(这对所有处理器都是可能的)
  • 以及使用 script processor 进行更复杂的处理(当预制处理器不够用时)

两者都使用 Painless 脚本语言(基于 Groovy)。 阅读 Painless 指南以获取更多信息。 然而重要的是:

  • 你可以通过 ctx 变量在脚本中访问你处理过的文档(所谓的摄取处理器上下文
  • ctx 包含提取的 JSON 的 Map 结构(通过方括号 ctx['my_field'] 引用各个字段)
  • 可以通过修改 ctx 变量的值来增加、修改或删除文档的字段

简单示例:如果其余部分对我们的目的没有用,我们可以使用它来仅索引原始文本的一部分(子字符串)。 所以只需更换管道......



1.  POST _ingest/pipeline/_simulate
2.  {
3.    "pipeline": {
4.      "description": "extract only part of the comments",
5.      "processors": [
6.        {
7.          "script": {
8.            "source": """
9.              ctx.comment = ctx.comment.substring(0,20)   " ..."
10.            """,
11.            "if": "ctx.containsKey('comment') && ctx['comment'].length() > 20"
12.          }
13.        }
14.      ]
15.    },
16.    "docs": [
17.      {
18.        "_source": {
19.          "comment": "Hello, this is a message which deserves a hair cut."
20.        }
21.      },
22.      {
23.        "_source": {
24.          "comment": "This is a short one"
25.        }
26.      }
27.    ]
28.  }


上面命令生成的结果为:



1.  {
2.    "docs": [
3.      {
4.        "doc": {
5.          "_index": "_index",
6.          "_id": "_id",
7.          "_version": "-3",
8.          "_source": {
9.            "comment": "Hello, this is a mes ..."
10.          },
11.          "_ingest": {
12.            "timestamp": "2023-03-07T03:54:32.402316754Z"
13.          }
14.        }
15.      },
16.      {
17.        "doc": {
18.          "_index": "_index",
19.          "_id": "_id",
20.          "_version": "-3",
21.          "_source": {
22.            "comment": "This is a short one"
23.          },
24.          "_ingest": {
25.            "timestamp": "2023-03-07T03:54:32.402361462Z"
26.          }
27.        }
28.      }
29.    ]
30.  }


提取结构化字段(解析默认的 NGINX 日志行格式)

如果你正在处理以某种明确定义的格式构建的数据(但未在单个字段中提取),你可以试用 dissect Processor。 只需用带百分号 %{my_field} 的大括号括起要提取的各个字段来描述模式。

我们可以使用这个处理器从默认的 NGINX 日志行格式(组合/主)中解析结构化字段,它具有以下结构。



1.  log_format main '$remote_addr - $remote_user [$time_local] '
2.  '"$request" $status $body_bytes_sent '
3.  '"$http_referer" "$http_user_agent"';


有关 NGINX 日志记录的更多信息,请参阅日志记录模块 ngx_http_log_module 的文档。 我们还可以使用 date processor 提取 @timestamp,因为原始值默认为非标准格式。

让我们把它放在一个新的管道中:



1.  POST _ingest/pipeline/_simulate
2.  {
3.    "pipeline": {
4.      "description": "structure NGINX logs",
5.      "processors": [
6.        {
7.          "dissect": {
8.            "field": "message",
9.            "pattern": "%{remote_addr} - %{remote_user} [%{time_local}] \"%{request}\" %{status} %{body_bytes_sent} \"%{http_referer}\" \"%{http_user_agent}\" \"%{http_x_forwarded_for}\""
10.          }
11.        },
12.        {
13.          "date": {
14.            "field": "time_local",
15.            "formats": [
16.              "dd/MMM/yyyy:HH:mm:ss Z"
17.            ],
18.            "timezone": "Europe/Prague"
19.          }
20.        }
21.      ]
22.    },
23.    "docs": [
24.      {
25.        "_source": {
26.          "message": "172.17.0.1 - - [24/Dec/2019:10:09:42  0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\""
27.        }
28.      }
29.    ]
30.  }


上面命令运行的结果为:



1.  {
2.    "docs": [
3.      {
4.        "doc": {
5.          "_index": "_index",
6.          "_id": "_id",
7.          "_version": "-3",
8.          "_source": {
9.            "remote_user": "-",
10.            "remote_addr": "172.17.0.1",
11.            "request": "GET / HTTP/1.1",
12.            "@timestamp": "2019-12-24T11:09:42.000 01:00",
13.            "http_referer": "-",
14.            "body_bytes_sent": "95",
15.            "http_x_forwarded_for": "-",
16.            "time_local": "24/Dec/2019:10:09:42  0000",
17.            "message": "172.17.0.1 - - [24/Dec/2019:10:09:42  0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\"",
18.            "status": "200",
19.            "http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
20.          },
21.          "_ingest": {
22.            "timestamp": "2023-03-07T04:07:13.2829158Z"
23.          }
24.        }
25.      }
26.    ]
27.  }


你可以对不同格式的内容使用类似的处理器,例如 CSV processor(从 csv 中提取字段)、KV processor(解析键=值对)或基于正则表达式的 Grok processor

我们可以创建 nginx_pipeline 摄取管道:



1.  PUT _ingest/pipeline/nginx_pipeline
2.  {
3.    "processors": [
4.      {
5.        "dissect": {
6.          "field": "message",
7.          "pattern": "%{remote_addr} - %{remote_user} [%{time_local}] \"%{request}\" %{status} %{body_bytes_sent} \"%{http_referer}\" \"%{http_user_agent}\" \"%{http_x_forwarded_for}\""
8.        }
9.      },
10.      {
11.        "date": {
12.          "field": "time_local",
13.          "formats": [
14.            "dd/MMM/yyyy:HH:mm:ss Z"
15.          ],
16.          "timezone": "Europe/Prague"
17.        }
18.      }
19.    ]
20.  }


我们可以通过如下的方式来摄取一个文档:



1.  PUT nginx_index/_doc/1?pipeline=nginx_pipeline
2.  {
3.    "message": "172.17.0.1 - - [24/Dec/2019:10:09:42  0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\""
4.  }


用另一个索引的值丰富

我们将使用的最后一件事是 enrich Processor,它可用于使用来自另一个索引的数据来丰富你摄取的文档。 酷吧!

在你的管道中使用它之前,它具有三个先决条件:

  • 你需要有我们将从中获取丰富数据的源索引
  • 你需要定义一个丰富的策略来定义源索引、匹配字段和附加字段
  • 您需要 _execute 丰富策略来为该策略创建丰富索引

让我们扩展前面的示例并使用已知 IP 地址的源索引。我们将检查以附加一些数据并查看 IP 是否不在潜在的黑名单中。

首先创建源索引和一个文档(注意:使用 refresh 查询参数以确保索引立即可用于搜索):



1.  PUT ip_source_index/_doc/1?refresh=wait_for
2.  {
3.    "ip": "172.17.0.1",
4.    "black_listed": false,
5.    "user_category": "test"
6.  }


接下来将创建 enrich 策略。 很简单 —— 只需链接我们的源索引(我们在上面创建的),匹配 ip 字段并列出相关字段。



1.  PUT _enrich/policy/ip_policy
2.  {
3.    "match": {
4.      "indices": "ip_source_index",
5.      "match_field": "ip",
6.      "enrich_fields": [
7.        "black_listed",
8.        "user_category"
9.      ]
10.    }
11.  }


我们需要执行它来创建丰富的索引:

POST _enrich/policy/ip_policy/_execute

现在我们终于可以将 enrich Processor 添加到我们之前的 nginx 管道中了。 我们需要引用丰富策略、我们将匹配的字段(我们在上一步中提取的 remote_addr)、丰富数据的目标字段和 max_matches(要包含的匹配文档的最大数量)。 将以下内容添加到处理器中……



1.  PUT _ingest/pipeline/nginx_pipeline
2.  {
3.    "processors": [
4.      {
5.        "dissect": {
6.          "field": "message",
7.          "pattern": "%{remote_addr} - %{remote_user} [%{time_local}] \"%{request}\" %{status} %{body_bytes_sent} \"%{http_referer}\" \"%{http_user_agent}\" \"%{http_x_forwarded_for}\""
8.        }
9.      },
10.      {
11.        "date": {
12.          "field": "time_local",
13.          "formats": [
14.            "dd/MMM/yyyy:HH:mm:ss Z"
15.          ],
16.          "timezone": "Europe/Prague"
17.        }
18.      },
19.      {
20.        "enrich": {
21.          "policy_name": "ip_policy",
22.          "field": "remote_addr",
23.          "target_field": "meta.ip",
24.          "max_matches": "1"
25.        }
26.      }
27.    ]
28.  }


现在只需(重新)索引之前的文档(使用相同的 nginx 日志行)并提取日志内容,然后从我们的 ip_source_index 索引中丰富。



1.  DELETE nginx_index

3.  PUT nginx_index/_doc/1?pipeline=nginx_pipeline
4.  {
5.    "message": "172.17.0.1 - - [24/Dec/2019:10:09:42  0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\""
6.  }

8.  GET nginx_index/_search?filter_path=**.hits


上面命令返回的文档结果为:



1.  {
2.    "hits": {
3.      "hits": [
4.        {
5.          "_index": "nginx_index",
6.          "_id": "1",
7.          "_score": 1,
8.          "_source": {
9.            "remote_user": "-",
10.            "remote_addr": "172.17.0.1",
11.            "request": "GET / HTTP/1.1",
12.            "@timestamp": "2019-12-24T11:09:42.000 01:00",
13.            "http_referer": "-",
14.            "meta": {
15.              "ip": {
16.                "black_listed": false,
17.                "user_category": "test",
18.                "ip": "172.17.0.1"
19.              }
20.            },
21.            "body_bytes_sent": "95",
22.            "http_x_forwarded_for": "-",
23.            "time_local": "24/Dec/2019:10:09:42  0000",
24.            "message": "172.17.0.1 - - [24/Dec/2019:10:09:42  0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\"",
25.            "status": "200",
26.            "http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
27.          }
28.        }
29.      ]
30.    }
31.  }


“生产” 建议

最后,我想向你指出其他概念和功能,当你想在实际场景中开始使用管道时,摄取管道本身或与它们结合使用时很有用。

与索引模板一起使用

如果你 rollover 索引(在处理时间序列数据时可能应该这样做,大小超过数十 GB 等),甚至更多,如果你通过索引生命周期管理(我倾向于这样做)实现这些自动化,绝对推荐使用索引模板。 这些有助于为从索引模板创建的所有索引形式化配置(设置和映射)。

从管道的角度来看,你可以在设置中指定:

  • index.default_pipeline 如果在请求中没有直接指定其他管道(如果默认被覆盖),则默认应用管道。
  • 每次在默认管道或请求管道之后运行的 index.final_pipeline

在包含这些之前,只需确保你的管道存在,否则你的请求将失败。

管道模拟

绝对有用(当你的管道就位并需要执行一些更改时)是通过 _simulate API 测试这些。 你可以在请求正文中指定新的管道定义以及几个测试文档并获得结果,就好像这些已被处理一样。



1.  POST /_ingest/pipeline/_simulate
2.  {
3.      "pipeline": {
4.          "processors": [
5.              ...
6.          ]
7.      },
8.      "docs": [
9.          {
10.              "_index": "index",
11.              "_id": "id",
12.              "_source": {
13.                  // your doc here
14.              }
15.          }
16.      ]
17.  }


在上面的代码中,我已经展示了如何使用 _smulate 这个端点来测试你的管道。

故障处理

确保处理管道执行期间发生的最终故障。 通过在特定处理器级别或整个管道级别定义 on_failure 块(由其他一些处理器组成)来实现。

例如。 将有问题的文档传递给不同的索引。



1.  {
2.      "processors": [ … ],
3.      "on_failure": [
4.          {
5.              "set": {
6.                  "field": "_index",
7.                  "value": "failed-{{ _index }}"
8.              }
9.          }
10.      ]
11.  }


更多阅读,请参考我之前的另外一篇文章 “Elasticsearch:如何正确处理 Elasticsearch 摄取管道故障”。

空值的处理

对于需要条件执行的情况(即仅当字段存在时),请确保处理引用字段缺失或具有无效值的情况。 你可以通过某些处理器(转换、重命名、删除……)提供的 ignore_missing 属性或在 if 块中执行此操作。

"if": "ctx.containsKey('foo') && ctx['foo'].containsKey('bar')"

重建索引 - reindex

通常当你发现摄取管道时,你的现有索引中已经有大量数据。 要让新的闪亮管道处理这些数据,只需创建一个新索引并使用 reindex API 来复制数据。 要处理数据,请在索引设置或重新索引请求正文中指定管道。



1.  POST /_reindex
2.  {
3.      "source": {
4.          "index": "source-index-*"
5.      },
6.      "dest": {
7.          "index": "destination-index",
8.          "pipeline": "my_pipeline"
9.      }
10.  }


总结

我们研究了进气管道的各种功能并对其进行了测试。 希望你喜欢这个介绍,并且你可以在你的场景中看到一些潜力。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanfekhf
系列文章
更多 icon
同类精品
更多 icon
继续加载