Heka Filter afd example: различия между версиями
Sirmax (обсуждение | вклад) |
Sirmax (обсуждение | вклад) |
||
Строка 277: | Строка 277: | ||
More details about Grafana: |
More details about Grafana: |
||
http://plugins.mirantis.com/docs/i/n/influxdb_grafana/influxdb_grafana-0.7.0.pdf |
http://plugins.mirantis.com/docs/i/n/influxdb_grafana/influxdb_grafana-0.7.0.pdf |
||
+ | |||
+ | ==ElasticSearch== |
||
+ | |||
+ | ==Kibana== |
||
==Plugin code== |
==Plugin code== |
Версия 23:51, 7 февраля 2016
Heka Filter AFD example
Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:
- count number of incoming messages per second
- calculate average rate
- put this rate into InfluxDB and create chart in Grafana
Collecting data
To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.
Cirular buffer
Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:
- https://github.com/mozilla-services/lua_circular_buffer/blob/master/README.md
- https://github.com/mozilla-services/lua_sandbox/
- http://hekad.readthedocs.org/en/latest/sandbox/
Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.
Simplest buffer created with <syntaxhighlight lang="lua">data1 = circular_buffer.new(60*24, 1, 60)</syntaxhighlight> looks like:
data1 +-----+-------+ |Time | Data1 | +-----+-------+ |-Ns | aN | |... | ... | |-2s | a3 | |-1s | a2 | |Now()| a1 | +-----+-------+
Arguments
- rows (unsigned) The number of rows in the buffer (must be > 1)
- columns (unsigned)The number of columns in the buffer (must be > 0)
- seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
- enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.
So here we created buffer to save data during 1 day (60min*24h)
Process messages
Each message comes to filter is calling process_message() function. <syntaxhighlight lang="lua">function process_message()
local ts = read_message("Timestamp") data1:add(ts, 1, 1) return 0
end</syntaxhighlight>
This function is very simple, it only populates circular buffer data1.
What exactly it does:
- take timestamp of message
- add 1 to raw corresponding to the timestamp
As result in 3 seconds after start data1 circular buffer looks like
data1 +-----+-------+ |Time | Data1 | +-----+-------+ |-Ns | nan | |-N+1s| nan | |... | ... | |-3s | nan | |-2s | 121 | |-1s | 120 | |Now()| 12 | +-----+-------+
in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"
Processing collected data
The main data processing is doing timer_event(ns) function.
This function is called every ticker_interval
This function is doing the following:
- log data1 circular buffer for debug with inject_payload() call. Logged message looks like:
:Timestamp: 2016-02-05 12:45:31.985034391 +0000 UTC :Type: heka.sandbox-output :Hostname: node-6 :Pid: 26318 :Uuid: 6457c1de-87ef-4069-ac57-051ecdb73bca :Logger: heartbeat_filter_lua :Payload: {"time":1454589960,"rows":1440,"columns":1,"seconds_per_row":60,"column_info":[{"name":"Messages","unit":"count","aggregation":"sum"}]} nan nan nan <SKIP> 85 86 51 :EnvVersion: :Severity: 7 :Fields: | name:"payload_type" type:string value:"cbuf_lua_test1" representation:"file-extension" | name:"payload_name" type:string value:""
So you can see collected data in :Payload field
- Calculates average messages rate. It may looks a little bit complicated, but please pay your attention: we DO NOT collect all messages in "currently running" second.
So instead of average = (a1+ ..+aN ) /N we need (a2+ .. +aN)/(N-1)
- Calculate difference in percents with Current messages rate and average and create alarm if needed. (For future use for alarming)
- Create debug message.
- Create message to be processed with influxdb output
function prev_time() calculates "time some steps ago" in circular buffer format. We need it because circular buffer library uses second,and time stamp is nano-seconds.
<syntaxhighlight lang="lua">function timer_event(ns)
local Payload msg.Fields['lua_heartbeat_2'] = 'True' msg.Timestamp = ns inject_payload("cbuf_lua_test1", "", data1) avg, active_rows = data1:compute("avg", 1) if active_rows > 2 then
-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now --. --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)
a1 = data1:get(ns, 1) if a1 == nil then
-- nil means 'no messages' = 0 messages
a1 = 0 end
N = active_rows
.
avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real delta_percents = math.abs(100 - current_percents )
debug_message.Timestamp = ns debug_message.Fields['AVG_messages'] = avg debug_message.Fields['AVG_messages_real'] = avg_real debug_message.Fields['active_rows'] = active_rows debug_message.Fields['current_percents'] = current_percents debug_message.Fields['delta_percents'] = delta_percents debug_message.Fields['critical_delta'] = critical_delta debug_message.Fields['is_alert'] = 'False' inject_message(debug_message)
influxdb_message.Timestamp = ns influxdb_message.Fields['payload_type'] = 'txt' influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000) .. "\n"
inject_message(influxdb_message)
if delta_percents > critical_delta then alert_message.Timestamp = ns alert_message.Fields['AVG_messages'] = avg alert_message.Fields['AVG_messages_real'] = avg_real alert_message.Fields['active_rows'] = active_rows alert_message.Fields['current_percents'] = current_percents alert_message.Fields['delta_percents'] = delta_percents alert_message.Fields['critical_delta'] = critical_delta alert_message.Fields['is_alert'] = 'True' inject_message(alert_message) end end inject_message(msg)
-- return 0 end</syntaxhighlight> Messages generated with inject_message(influxdb_message) call will be processed by inluxdb-output. These messages looks like:
:Timestamp: 2016-02-05 08:00:03.152148992 +0000 UTC :Type: heka.sandbox.HEARTBEAT :Hostname: node-6 :Pid: 0 :Uuid: b3b3319c-0db6-47fa-a2d5-55877d489156 :Logger: heartbeat_filter_lua :Payload: AVG_messages_real,deployment_id=3,hostname=node-6 value=51 1454659203152 :EnvVersion: :Severity: 6 :Fields: | name:"payload_type" type:string value:"txt" | name:"payload_name" type:string value:"influxdb"
Plugin Configuration
I use the following plugin configuration:
[heartbeat_filter_lua] type = "SandboxFilter" filename = "/usr/share/lma_collector/filters/afd_test2.lua" message_matcher = "Type !~ /HEARTBEAT/ && Fields[payload_type] !~ /cbuf_lua_test1/" #ticker_interval = 120 #preserve_data = false ticker_interval = 1 [heartbeat_filter_lua.config] critical_delta = 1 hostname = "node-6"
Loopback error
Please pay your attention - there is a possibility to create endless loop.
So in case we use message_matcher = "TRUE" (means "all messages") we will create endless loop because any message created by filter triggers filter call again.
[heartbeat_filter_lua] type = "SandboxFilter" filename = "/usr/share/lma_collector/filters/afd_test2.lua" message_matcher = "TRUE" ticker_interval = 1 [heartbeat_filter_lua.config] critical_delta = 1
2016/02/03 18:39:40 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe 2016/02/03 18:39:41 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher) 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.
InfluxDB
Next step is check data in InfluxDB. Siplest way is using ipython and influxdb python client (http://influxdb-python.readthedocs.org/en/latest/include-readme.html):
import influxdb client = influxdb.InfluxDBClient("<elastic search server's IP addrtess>", "8086", "lma_user", "lma_password") client.get_list_database() Out[41]: [{u'name': u'_internal'}, {u'name': u'lma'}] query = 'select value from AVG_messages_real' result = client.query(query) for i in result: print(json.dumps(i, sort_keys=True, indent=4, separators=(',', ': ') ) )
Output:
[ { "time": "2016-02-05T08:00:02.148Z", "value": 51 }, { "time": "2016-02-05T08:00:03.152Z", "value": 51 }, { "time": "2016-02-05T08:00:04.151Z", "value": 51 } ]
Grafana
Now we can create simple chart in Grafana:
As you can see on screenshot we use following request to build chart:
SELECT mean("value") AS "value" FROM "AVG_messages_real" WHERE "hostname" = 'node-6' AND "deployment_id" = '3' AND $timeFilter GROUP BY time(10s)
More details about Grafana: http://plugins.mirantis.com/docs/i/n/influxdb_grafana/influxdb_grafana-0.7.0.pdf
ElasticSearch
Kibana
Plugin code
<syntaxhighlight lang="lua">require "string" require "math"
require "circular_buffer"
-- 60 min * 24h = 1 day, 1 row 60 sec per row
data1 = circular_buffer.new(60*24, 1, 60)
local COUNT1 = data1:set_header(1, "Messages", "count", "sum")
local critical_delta = read_config('critical_delta') or error('critical_delta must be specified!')
local hostname = read_config('hostname') or error('hostname must be specified!')
local c=0
local msg = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}
}
local debug_message = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}
}
local alert_message = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}
}
local influxdb_message = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}, Payload = ""
}
function prev_time(step, ts, cbuf)
rows, columns, seconds_per_row = cbuf:get_configuration() return ts - (1000000000 * seconds_per_row * step)
end
function process_message()
local ts = read_message("Timestamp") data1:add(ts, 1, 1) return 0
end
function timer_event(ns)
local Payload msg.Fields['lua_heartbeat_2'] = 'True' msg.Timestamp = ns inject_payload("cbuf_lua_test1", "", data1) avg, active_rows = data1:compute("avg", 1) if active_rows > 2 then
-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now -- --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)
a1 = data1:get(ns, 1) if a1 == nil then
-- nil means 'no messages' = 0 messages
a1 = 0 end
N = active_rows
avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real delta_percents = math.abs(100 - current_percents )
debug_message.Timestamp = ns debug_message.Fields['AVG_messages'] = avg debug_message.Fields['AVG_messages_real'] = avg_real debug_message.Fields['active_rows'] = active_rows debug_message.Fields['current_percents'] = current_percents debug_message.Fields['delta_percents'] = delta_percents debug_message.Fields['critical_delta'] = critical_delta debug_message.Fields['is_alert'] = 'False' inject_message(debug_message)
influxdb_message.Timestamp = ns influxdb_message.Fields['payload_type'] = 'txt' influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000) .. "\n"
inject_message(influxdb_message)
if delta_percents > critical_delta then alert_message.Timestamp = ns alert_message.Fields['AVG_messages'] = avg alert_message.Fields['AVG_messages_real'] = avg_real alert_message.Fields['active_rows'] = active_rows alert_message.Fields['current_percents'] = current_percents alert_message.Fields['delta_percents'] = delta_percents alert_message.Fields['critical_delta'] = critical_delta alert_message.Fields['is_alert'] = 'True' inject_message(alert_message) end end inject_message(msg)
-- return 0 end</syntaxhighlight>