본문 바로가기
NIFI

NIFI QUEUE MONITORING (use controller service)

by kyeongseo.oh 2022. 2. 26.

template : https://kyeongseo.tistory.com/entry/nifi-template

 

특정 프로세서에서 bottleneck이 걸려 flowfile이 몰려있는 경우 slack으로 메시지를 보낼 수 있도록 함

 

[전체 flow]

 

[flow 설명]

  • queryrecord

json에서 필요한 정보만 parsing함

backpressure가 활성화된 connection만 parsing

 

  • mergerecord

중요) queryrecord와 mergerecord 사이의 connection은 반드시 single node로 setting해줘야함. 모든 flowfile이 하나의 node로 가야 제대로 merge된다.

 

flowfile이 노드 수만큼 생성된다. 

중복제거를 위해 flowfile을 모두 합쳐준다.

약간의 간격을 가지고 flowfile이 들어오기에 max bin age를 10 seconds로 설정

 

  • queryrecord

동일 componentId 중 가장 최신 데이터만 추출

timestamp가 예약어인 관계로 "timestamp"로 써야함

그냥 group by componentId 해도 상관없음

 

  • extracttext

json결과를 attribute로 추출함

maximum capture group length가 작으면 결과가 잘리기 때문에 적절하게 올려준다.

 

  • putslack

결과를 slack으로 전송

 

 

[queue 정보 가져오는 방법]

우선 input port를 하나 생성한다.

input port의 이름은 s2sinput으로 하였음

controller settings을 클릭한다.

 

nifi 전체 flow의 connect 정보를 모두 받아오기 위해서는 SiteToSiteStatusReportingTask를 추가해아함

controller settings -> reporting tasks -> SiteToSiteStatusReportingTask

 

기본으로 5분에 한번씩 connection 정보를 수집함

필요한 경우 적절하게 변경 가능

 

다음과 같이 config를 입력한다.

destination url은 정보를 받을 nifi node의 url

input port name은 위에서 만든 input port로 설정

transport protocol은 http로 설정. raw로 하려면 별도 socket port 등을 설정해야함

component type filter regex 기본값으로 여러 개가 있는 데 connect 정보만 받아올 예정이기에 connection 제외하고

전부 제거하였음 

 

 

설정 후 SiteToSiteStatusReportingTask를 시작하면 다음과 같이 input port를 통해 connection 정보가 들어옴.

nifi가 4대로 cluster 되있어서 4개의 flowfile이 들어옴

 

list queue로 보면 다음과 같은 정보가 들어있음을 확인할 수 있다.

 

테스트를 위해 backpressure를 발생시킨 후 slack message를 확인해보면 아래와 같이 메시지가 온 것을 볼 수 있음

 

메시지 중 componentId의 값을복사해 nifi에서 검색하면 bottleneck이 걸린 connection을 찾을 수 있다.

 

[해결 방안]

  • 단순히 flowfile의 크기가 커서 back pressure가 발생하는 경우

큰 파일을 fetch하는 작업의 경우 가져오는 파일의 크기가 커서 back pressure가 발생할 수 있음

 

이런 경우에는 파일 크기에 맞춰 Size Threshold 증가시켜야함

 

  • 프로세서의 처리가 늦어 back pressure가 발생하는 경우

1. Load Balance Strategy를 partition by attribute or round robin으로 변경

2. Concurrent Tasks를 적당히 올려준다. (db에 붙는 프로세서 같은 경우 너무 올리면 db가 죽을 수 있음)

 

그냥 위 방안을 모두 적용해도 됨

댓글