Heka Filter afd example: различия между версиями

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску
Строка 74: Строка 74:
 
in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"
 
in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"
 
==Processing collected data==
 
==Processing collected data==
  +
The main data processing is doing timer_event(ns) function.
  +
This function is called every ticker_interval
  +
<BR>
  +
<BR>This function is doing the following:
  +
* log data1 circular buffer for debug with inject_payload() call. Logged message looks like:
  +
<PRE>
  +
:Timestamp: 2016-02-05 12:45:31.985034391 +0000 UTC
  +
:Type: heka.sandbox-output
  +
:Hostname: node-6
  +
:Pid: 26318
  +
:Uuid: 6457c1de-87ef-4069-ac57-051ecdb73bca
  +
:Logger: heartbeat_filter_lua
  +
:Payload: {"time":1454589960,"rows":1440,"columns":1,"seconds_per_row":60,"column_info":[{"name":"Messages","unit":"count","aggregation":"sum"}]}
  +
nan
  +
nan
  +
nan
  +
<SKIP>
  +
85
  +
86
  +
51
   
  +
:EnvVersion:
 
  +
:Severity: 7
  +
:Fields:
  +
| name:"payload_type" type:string value:"cbuf_lua_test1" representation:"file-extension"
  +
| name:"payload_name" type:string value:""
  +
</PRE>
   
   

Версия 15:46, 5 февраля 2016

Heka Filter AFD example

Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:

  • count number of incoming messages per second
  • calculate average rate
  • put this rate into InfluxDB and create chart in Grafana


Collecting data

To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.

Cirular buffer

Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:


Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.

Simplest buffer created with <syntaxhighlight lang="lua">data1 = circular_buffer.new(60*24, 1, 60)</syntaxhighlight> looks like:

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  aN   |
|...  | ...   |
|-2s  |  a3   |
|-1s  |  a2   |
|Now()|  a1   |
+-----+-------+


Arguments

  • rows (unsigned) The number of rows in the buffer (must be > 1)
  • columns (unsigned)The number of columns in the buffer (must be > 0)
  • seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
  • enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.

So here we created buffer to save data during 1 day (60min*24h)

Process messages

Each message comes to filter is calling process_message() function. <syntaxhighlight lang="lua">function process_message()

   local ts = read_message("Timestamp")
   data1:add(ts, 1, 1)
   return 0

end</syntaxhighlight> This function is very simple, it only populates circular buffer data1.
What exactly it does:

  • take timestamp of message
  • add 1 to raw corresponding to the timestamp

As result in 3 seconds after start data1 circular buffer looks like

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  nan  |
|-N+1s|  nan  |
|...  | ...   |
|-3s  |  nan  |
|-2s  |  121  |
|-1s  |  120  |
|Now()|  12   |
+-----+-------+

in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"

Processing collected data

The main data processing is doing timer_event(ns) function. This function is called every ticker_interval

This function is doing the following:

  • log data1 circular buffer for debug with inject_payload() call. Logged message looks like:
:Timestamp: 2016-02-05 12:45:31.985034391 +0000 UTC
:Type: heka.sandbox-output
:Hostname: node-6
:Pid: 26318
:Uuid: 6457c1de-87ef-4069-ac57-051ecdb73bca
:Logger: heartbeat_filter_lua
:Payload: {"time":1454589960,"rows":1440,"columns":1,"seconds_per_row":60,"column_info":[{"name":"Messages","unit":"count","aggregation":"sum"}]}
nan
nan
nan
<SKIP>
85
86
51

:EnvVersion:
:Severity: 7
:Fields:
    | name:"payload_type" type:string value:"cbuf_lua_test1" representation:"file-extension"
    | name:"payload_name" type:string value:""


<syntaxhighlight lang="lua">function timer_event(ns)

 local Payload
 msg.Fields['lua_heartbeat_2'] = 'True'
 msg.Timestamp = ns
 inject_payload("cbuf_lua_test1", "", data1)
 avg, active_rows = data1:compute("avg", 1)
 if active_rows > 2 then

-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now --. --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)

   a1 = data1:get(ns, 1)
   if a1 == nil then

-- nil means 'no messages' = 0 messages

     a1 = 0
   end
   N = active_rows

.

   avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
   current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
   delta_percents = math.abs(100 - current_percents )
   debug_message.Timestamp = ns
   debug_message.Fields['AVG_messages'] = avg
   debug_message.Fields['AVG_messages_real'] = avg_real
   debug_message.Fields['active_rows'] = active_rows
   debug_message.Fields['current_percents'] = current_percents
   debug_message.Fields['delta_percents']   = delta_percents
   debug_message.Fields['critical_delta']   = critical_delta
   debug_message.Fields['is_alert']         = 'False'
   inject_message(debug_message)
   influxdb_message.Timestamp = ns
   influxdb_message.Fields['payload_type'] = 'txt'
   influxdb_message.Fields['payload_name'] = 'influxdb'

-- Time is in ms

   influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
   inject_message(influxdb_message)
   if delta_percents > critical_delta then
     alert_message.Timestamp = ns
     alert_message.Fields['AVG_messages']       = avg
     alert_message.Fields['AVG_messages_real']  = avg_real
     alert_message.Fields['active_rows']      = active_rows
     alert_message.Fields['current_percents'] = current_percents
     alert_message.Fields['delta_percents']   = delta_percents
     alert_message.Fields['critical_delta']   = critical_delta
     alert_message.Fields['is_alert']         = 'True'
     inject_message(alert_message)
   end
 end
inject_message(msg)

-- return 0 end</syntaxhighlight>

Loopback error

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
message_matcher = "TRUE"
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 1
2016/02/03 18:39:40 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe
2016/02/03 18:39:41 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher)
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.