뒤로가기

Elastic

Ingest PipeLine 설정을 통한 데이터 전처리

하선호 2020.09.04.

1. 개요

ElasticSearch로 색인하다보면 필요에 따라 데이터를 가공해야할 필요가 있습니다. 예를 들어 구분값으로 나눠진 데이터를 집계해야 할 때, HTML 태그가 포함된 필드를 색인 할 때 등 그래서 이러한 데이터를 별도의 프로세스를 통해 가공하여 색인했었는데요. 인덱스 별로 가공해야 할 필드가 다르고 필드명 또한 달라서 매번 인덱스에 해당하는 프로세스를 만들고, 배포하는 작업이 매우 불편할 것 같았습니다. 그런 상태에서 Pipelineprocessor에 대한 조언을 받아 알아보게 되었습니다. Pipeline은 아래 링크를 통해 확인 하실 수 있습니다.

2. PipeLine

우선 PipeLine 설정하기 위해서는 Ingest Node가 활성화 되어있어야 합니다. PipeLine 설정은 다음와 같습니다.

PUT _ingest/pipline/{pipline name}
{
  "description" : "...",
  "processors" : [ ... ]
}

description 은 해당 pipeLine에 대한 설명 필드이며 processors에는 pipeLine에서 수행할 processors에 대한 정의를 작성하는 필드입니다. 정의한 순서대로 수행되며 여러개의 processors를 등록할 수 있습니다.

3. Processor

processorsPROCESSOR_NAME는 커스텀한 네임이 아닌 processors 별 지정된 이름과 함께 지정된 옵션을 정의해주시면 됩니다.

{
  "PROCESSOR_NAME" : {
    ... processor configuration options ...
  }
}

3-1. Split processor

첫번째로 사용해본 processorSplit processor입니다. 정의된 필드의 데이터를 구분자로 분해하여 배열형태로 만들어줍니다.

PUT _ingest/pipeline/split_field
{
  "description": "Decompose the field by separator",
  "processors": [
    {
      "split": {
        "field": "productList",
        "separator": ","
      }
    }
  ]
}

_simulate API를 통해서 생성할 processor를 확인해 볼 수도 있습니다. 위에 설명드린대로 pipeline 필드를 작성하고 doc - _source 필드에 값을 입력하면 됩니다.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Decompose the field by separator",
    "processors": [
      {
        "split": {
          "field": "productList",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "productList": "1,2,3,4,5"
      }
    }
  ]
}

위 처럼 _simulate를 통해 아래처럼 입력된 값이 구분자를 통해 배열로 출력되는걸 확인할 수 있습니다.

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "productList" : [
            "1",
            "2",
            "3",
            "4",
            "5"
          ]
        },
        "_ingest" : {
          "timestamp" : "2020-09-04T02:23:40.241192Z"
        }
      }
    }
  ]
}

3-1. HTML strip processor

두번째로 사용해볼 processorHTML strip processor입니다. 필드의 HTML Tag를 제거해줍니다.

PUT _ingest/pipeline/html_strip
{
  "description": "Remove Html Tag", 
  "processors": [
    {
      "html_strip": {
        "field": "contents"
      }
    }
  ]
}

_simulate를 통해 확인해보면 <em>:</em> 태그가 제거되었습니다.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Remove Html Tag",
    "processors": [
      {
        "html_strip": {
          "field": "contents"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "contents": "<em>다나와 ElasticSearch</em>"
      }
    }
  ]
}

4. Index 적용

정의한 PipeLine 을 적용하여 색인 할 때는 index API 뒤에 pipeline={pipeline_name} 을 붙여 호출하면 됩니다.

아래 예제는 PipeLine 정의 후 색인까지의 예제입니다.

## 인덱스 생성
PUT sample_index
{
  "mappings": {
    "properties": {
      "boardContents": {
        "type": "text"
      },
      "productSeqList": {
        "type": "keyword"
      }
    }
  }
}


## PipeLine 정의
PUT _ingest/pipeline/sample_pipeLine
{
  "description": "sample", 
  "processors": [
    {
      "html_strip": {
        "field": "boardContents"
      }
    },
    {
      "trim": {
        "field": "productSeqList"
      }
    },
    {
      "split": {
        "field": "productSeqList",
        "separator": ","
      }
    }
  ]
}

##색인
POST sample_index/_doc?pipeline=sample_pipeLine
{
  "boardContents" : "<em>sample 제목</em>",
  "productSeqList" : "001,002,003"
}

결과

    {
        "_index" : "sample_index",
        "_type" : "_doc",
        "_id" : "RjRaV3QBZ9BimQhnrQr8",
        "_score" : 1.0,
        "_source" : {
          "boardContents" : "sample 제목",
          "productSeqList" : [
            "001",
            "002",
            "003"
          ]
        }
      }

5. 결론

인덱스마다 별도의 데이터 전처리하는 과정을 따로 만드는게 번거롭고 관리에 있어서 불편하다 생각했는데 이번에 PipeLine을 알게되면서 더 용이한 방법으로 색인전 전처리를 할 수 있었습니다. 설명한 2개의 processor뿐 아니라 csv, grok 등 다양한 processor가 있으며 이를 통해 더 색인시 다양한 데이터 처리를 할수 있습니다

참고