Heka Filter afd example

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску

Heka Filter AFD example

Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:

  • count number of incoming messages per second
  • calculate average rate
  • put this rate into InfluxDB and create chart in Grafana


Collecting data

To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.

Cirular buffer

Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:


Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.

Simplest buffer created with <syntaxhighlight lang="lua">data1 = circular_buffer.new(60*24, 1, 60)</syntaxhighlight> looks like:

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  aN   |
|...  | ...   |
|-2s  |  a3   |
|-1s  |  a2   |
|Now()|  a1   |
+-----+-------+


Arguments

  • rows (unsigned) The number of rows in the buffer (must be > 1)
  • columns (unsigned)The number of columns in the buffer (must be > 0)
  • seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
  • enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.

So here we created buffer to save data during 1 day (60min*24h)

Process messages

Each message comes to filter is calling process_message() function. <syntaxhighlight lang="lua">function process_message()

   local ts = read_message("Timestamp")
   data1:add(ts, 1, 1)
   return 0

end</syntaxhighlight> This function is very simple, it only populates circular buffer data1.
What exactly it does:

  • take timestamp of message
  • add 1 to raw corresponding to the timestamp

As result in 3 seconds after start data1 circular buffer looks like

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  nan  |
|-N+1s|  nan  |
|...  | ...   |
|-3s  |  nan  |
|-2s  |  121  |
|-1s  |  120  |
|Now()|  12   |
+-----+-------+

in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"

Processing collected data

<syntaxhighlight lang="lua">function timer_event(ns)

 local Payload
 msg.Fields['lua_heartbeat_2'] = 'True'
 msg.Timestamp = ns
 inject_payload("cbuf_lua_test1", "", data1)
 avg, active_rows = data1:compute("avg", 1)
 if active_rows > 2 then

-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now --. --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)

   a1 = data1:get(ns, 1)
   if a1 == nil then

-- nil means 'no messages' = 0 messages

     a1 = 0
   end
   N = active_rows

.

   avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
   current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
   delta_percents = math.abs(100 - current_percents )
   debug_message.Timestamp = ns
   debug_message.Fields['AVG_messages'] = avg
   debug_message.Fields['AVG_messages_real'] = avg_real
   debug_message.Fields['active_rows'] = active_rows
   debug_message.Fields['current_percents'] = current_percents
   debug_message.Fields['delta_percents']   = delta_percents
   debug_message.Fields['critical_delta']   = critical_delta
   debug_message.Fields['is_alert']         = 'False'
   inject_message(debug_message)
   influxdb_message.Timestamp = ns
   influxdb_message.Fields['payload_type'] = 'txt'
   influxdb_message.Fields['payload_name'] = 'influxdb'

-- Time is in ms

   influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
   inject_message(influxdb_message)
   if delta_percents > critical_delta then
     alert_message.Timestamp = ns
     alert_message.Fields['AVG_messages']       = avg
     alert_message.Fields['AVG_messages_real']  = avg_real
     alert_message.Fields['active_rows']      = active_rows
     alert_message.Fields['current_percents'] = current_percents
     alert_message.Fields['delta_percents']   = delta_percents
     alert_message.Fields['critical_delta']   = critical_delta
     alert_message.Fields['is_alert']         = 'True'
     inject_message(alert_message)
   end
 end
inject_message(msg)

-- return 0 end</syntaxhighlight>

Loopback error

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
message_matcher = "TRUE"
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 1
2016/02/03 18:39:40 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe
2016/02/03 18:39:41 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher)
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.