Heka Filter afd example
Heka Filter AFD example
Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:
- count number of incoming messages per second
- calculate average rate
- put this rate into InfluxDB and create chart in Grafana
Collecting data
To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.
Cirular buffer
Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:
- https://github.com/mozilla-services/lua_circular_buffer/blob/master/README.md
- https://github.com/mozilla-services/lua_sandbox/
- http://hekad.readthedocs.org/en/latest/sandbox/
Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.
Simplest buffer created with <syntaxhighlight lang="lua">data1 = circular_buffer.new(60*24, 1, 60)</syntaxhighlight> looks like:
data1 +-----+-------+ |Time | Data1 | +-----+-------+ |-Ns | aN | |... | ... | |-2s | a3 | |-1s | a2 | |Now()| a1 | +-----+-------+
Arguments
- rows (unsigned) The number of rows in the buffer (must be > 1)
- columns (unsigned)The number of columns in the buffer (must be > 0)
- seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
- enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.
So here we created buffer to save data during 1 day (60min*24h)
Process messages
Each message comes to filter is calling process_message() function. <syntaxhighlight lang="lua">function process_message()
local ts = read_message("Timestamp") data1:add(ts, 1, 1) return 0
end</syntaxhighlight>
This function is very simple, it only populates circular buffer data1.
What exactly it does:
- take timestamp of message
- add 1 to raw corresponding to the timestamp
As result in 3 seconds after start data1 circular buffer looks like
data1 +-----+-------+ |Time | Data1 | +-----+-------+ |-Ns | nan | |-N+1s| nan | |... | ... | |-3s | nan | |-2s | 121 | |-1s | 120 | |Now()| 12 | +-----+-------+
in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"
Processing collected data
<syntaxhighlight lang="lua">function timer_event(ns)
local Payload msg.Fields['lua_heartbeat_2'] = 'True' msg.Timestamp = ns inject_payload("cbuf_lua_test1", "", data1) avg, active_rows = data1:compute("avg", 1) if active_rows > 2 then
-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now --. --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)
a1 = data1:get(ns, 1) if a1 == nil then
-- nil means 'no messages' = 0 messages
a1 = 0 end
N = active_rows
.
avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real delta_percents = math.abs(100 - current_percents )
debug_message.Timestamp = ns debug_message.Fields['AVG_messages'] = avg debug_message.Fields['AVG_messages_real'] = avg_real debug_message.Fields['active_rows'] = active_rows debug_message.Fields['current_percents'] = current_percents debug_message.Fields['delta_percents'] = delta_percents debug_message.Fields['critical_delta'] = critical_delta debug_message.Fields['is_alert'] = 'False' inject_message(debug_message)
influxdb_message.Timestamp = ns influxdb_message.Fields['payload_type'] = 'txt' influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000) .. "\n"
inject_message(influxdb_message)
if delta_percents > critical_delta then alert_message.Timestamp = ns alert_message.Fields['AVG_messages'] = avg alert_message.Fields['AVG_messages_real'] = avg_real alert_message.Fields['active_rows'] = active_rows alert_message.Fields['current_percents'] = current_percents alert_message.Fields['delta_percents'] = delta_percents alert_message.Fields['critical_delta'] = critical_delta alert_message.Fields['is_alert'] = 'True' inject_message(alert_message) end end inject_message(msg)
-- return 0 end</syntaxhighlight>
Loopback error
[heartbeat_filter_lua] type = "SandboxFilter" filename = "/usr/share/lma_collector/filters/afd_test2.lua" message_matcher = "TRUE" ticker_interval = 1 [heartbeat_filter_lua.config] critical_delta = 1
2016/02/03 18:39:40 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe 2016/02/03 18:39:41 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher) 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.