Heka Filter afd example

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску

Heka Filter AFD example

Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:

  • count number of incoming messages per second
  • calculate average rate
  • put this rate into InfluxDB and create chart in Grafana


Collecting data

To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.

Cirular buffer

Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:


Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.

Simplest buffer created with <syntaxhighlight lang="lua">data1 = circular_buffer.new(60*24, 1, 60)</syntaxhighlight> looks like:

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  aN   |
|...  | ...   |
|-2s  |  a3   |
|-1s  |  a2   |
|Now()|  a1   |
+-----+-------+


Arguments

  • rows (unsigned) The number of rows in the buffer (must be > 1)
  • columns (unsigned)The number of columns in the buffer (must be > 0)
  • seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
  • enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.

So here we created buffer to save data during 1 day (60min*24h)

Process messages

Each message comes to filter is calling process_message() function. <syntaxhighlight lang="lua">function process_message()

   local ts = read_message("Timestamp")
   data1:add(ts, 1, 1)
   return 0

end</syntaxhighlight> This function is very simple, it only populates circular buffer data1.
What exactly it does:

  • take timestamp of message
  • add 1 to raw corresponding to the timestamp

As result in 3 seconds after start data1 circular buffer looks like

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  nan  |
|-N+1s|  nan  |
|...  | ...   |
|-3s  |  nan  |
|-2s  |  121  |
|-1s  |  120  |
|Now()|  12   |
+-----+-------+

in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"

Processing collected data

The main data processing is doing timer_event(ns) function. This function is called every ticker_interval

This function is doing the following:

  • log data1 circular buffer for debug with inject_payload() call. Logged message looks like:
:Timestamp: 2016-02-05 12:45:31.985034391 +0000 UTC
:Type: heka.sandbox-output
:Hostname: node-6
:Pid: 26318
:Uuid: 6457c1de-87ef-4069-ac57-051ecdb73bca
:Logger: heartbeat_filter_lua
:Payload: {"time":1454589960,"rows":1440,"columns":1,"seconds_per_row":60,"column_info":[{"name":"Messages","unit":"count","aggregation":"sum"}]}
nan
nan
nan
<SKIP>
85
86
51

:EnvVersion:
:Severity: 7
:Fields:
    | name:"payload_type" type:string value:"cbuf_lua_test1" representation:"file-extension"
    | name:"payload_name" type:string value:""

So you can see collected data in :Payload field

  • Calculates average messages rate. It may looks a little bit complicated, but please pay your attention: we DO NOT collect all messages in "currently running" second.

So instead of average = (a1+ ..+aN ) /N we need (a2+ .. +aN)/(N-1)

  • Calculate difference in percents with Current messages rate and average and create alarm if needed. (For future use for alarming)
  • Create debug message.
  • Create message to be processed with influxdb output

function prev_time() calculates "time some steps ago" in circular buffer format. We need it because circular buffer library uses second,and time stamp is nano-seconds.


<syntaxhighlight lang="lua">function timer_event(ns)

 local Payload
 msg.Fields['lua_heartbeat_2'] = 'True'
 msg.Timestamp = ns
 inject_payload("cbuf_lua_test1", "", data1)
 avg, active_rows = data1:compute("avg", 1)
 if active_rows > 2 then

-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now --. --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)

   a1 = data1:get(ns, 1)
   if a1 == nil then

-- nil means 'no messages' = 0 messages

     a1 = 0
   end
   N = active_rows

.

   avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
   current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
   delta_percents = math.abs(100 - current_percents )
   debug_message.Timestamp = ns
   debug_message.Fields['AVG_messages'] = avg
   debug_message.Fields['AVG_messages_real'] = avg_real
   debug_message.Fields['active_rows'] = active_rows
   debug_message.Fields['current_percents'] = current_percents
   debug_message.Fields['delta_percents']   = delta_percents
   debug_message.Fields['critical_delta']   = critical_delta
   debug_message.Fields['is_alert']         = 'False'
   inject_message(debug_message)
   influxdb_message.Timestamp = ns
   influxdb_message.Fields['payload_type'] = 'txt'
   influxdb_message.Fields['payload_name'] = 'influxdb'

-- Time is in ms

   influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
   inject_message(influxdb_message)
   if delta_percents > critical_delta then
     alert_message.Timestamp = ns
     alert_message.Fields['AVG_messages']       = avg
     alert_message.Fields['AVG_messages_real']  = avg_real
     alert_message.Fields['active_rows']      = active_rows
     alert_message.Fields['current_percents'] = current_percents
     alert_message.Fields['delta_percents']   = delta_percents
     alert_message.Fields['critical_delta']   = critical_delta
     alert_message.Fields['is_alert']         = 'True'
     inject_message(alert_message)
   end
 end
inject_message(msg)

-- return 0 end</syntaxhighlight> Messages generated with inject_message(influxdb_message) call will be processed by inluxdb-output. These messages looks like:

:Timestamp: 2016-02-05 08:00:03.152148992 +0000 UTC
:Type: heka.sandbox.HEARTBEAT
:Hostname: node-6
:Pid: 0
:Uuid: b3b3319c-0db6-47fa-a2d5-55877d489156
:Logger: heartbeat_filter_lua
:Payload: AVG_messages_real,deployment_id=3,hostname=node-6 value=51 1454659203152

:EnvVersion:
:Severity: 6
:Fields:
    | name:"payload_type" type:string value:"txt"
    | name:"payload_name" type:string value:"influxdb"

Plugin Configuration

I use the following plugin configuration:

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
message_matcher = "Type  !~ /HEARTBEAT/ && Fields[payload_type] !~ /cbuf_lua_test1/"
#ticker_interval = 120
#preserve_data = false
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 1
    hostname = "node-6"

Loopback error

Please pay attention - there is a possibility to create loop back. The filter generates message and may call itself on this message, generate message again ... So in case we use message_matcher = "TRUE" (means "all messages") we will provide endless loop

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
message_matcher = "TRUE"
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 1


2016/02/03 18:39:40 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe
2016/02/03 18:39:41 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher)
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.


InfluxDB

Next step is check data in InfluxDB

Grafana

Plugin code

<syntaxhighlight lang="lua">require "string" require "math"

require "circular_buffer"


-- 60 min * 24h = 1 day, 1 row 60 sec per row data1 = circular_buffer.new(60*24, 1, 60)


local COUNT1 = data1:set_header(1, "Messages", "count", "sum")

local critical_delta = read_config('critical_delta') or error('critical_delta must be specified!')

local hostname = read_config('hostname') or error('hostname must be specified!')


local c=0 local msg = {

   Type = "HEARTBEAT",
   Timestamp = nil,
   Severity = 6,
   Fields = {}

}


local debug_message = {

   Type = "HEARTBEAT",
   Timestamp = nil,
   Severity = 6,
   Fields = {}

}


local alert_message = {

   Type = "HEARTBEAT",
   Timestamp = nil,
   Severity = 6,
   Fields = {}

}

local influxdb_message = {

   Type = "HEARTBEAT",
   Timestamp = nil,
   Severity = 6,
   Fields = {},
   Payload = ""

}


function prev_time(step, ts, cbuf)

 rows, columns, seconds_per_row = cbuf:get_configuration()
 return ts - (1000000000 * seconds_per_row * step)

end

function process_message()

   local ts = read_message("Timestamp")
   data1:add(ts, 1, 1)
   return 0

end


function timer_event(ns)

 local Payload
 msg.Fields['lua_heartbeat_2'] = 'True'
 msg.Timestamp = ns
 inject_payload("cbuf_lua_test1", "", data1)
 avg, active_rows = data1:compute("avg", 1)
 if active_rows > 2 then

-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now -- --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)

   a1 = data1:get(ns, 1)
   if a1 == nil then

-- nil means 'no messages' = 0 messages

     a1 = 0
   end
   N = active_rows
   avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
   current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
   delta_percents = math.abs(100 - current_percents )
   debug_message.Timestamp = ns
   debug_message.Fields['AVG_messages'] = avg
   debug_message.Fields['AVG_messages_real'] = avg_real
   debug_message.Fields['active_rows'] = active_rows
   debug_message.Fields['current_percents'] = current_percents
   debug_message.Fields['delta_percents']   = delta_percents
   debug_message.Fields['critical_delta']   = critical_delta
   debug_message.Fields['is_alert']         = 'False'
   inject_message(debug_message)
   influxdb_message.Timestamp = ns
   influxdb_message.Fields['payload_type'] = 'txt'
   influxdb_message.Fields['payload_name'] = 'influxdb'

-- Time is in ms

   influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
   inject_message(influxdb_message)
   if delta_percents > critical_delta then
     alert_message.Timestamp = ns
     alert_message.Fields['AVG_messages']       = avg
     alert_message.Fields['AVG_messages_real']  = avg_real
     alert_message.Fields['active_rows']      = active_rows
     alert_message.Fields['current_percents'] = current_percents
     alert_message.Fields['delta_percents']   = delta_percents
     alert_message.Fields['critical_delta']   = critical_delta
     alert_message.Fields['is_alert']         = 'True'
     inject_message(alert_message)
   end
 end
inject_message(msg)

-- return 0 end</syntaxhighlight>