Heka Filter afd example: различия между версиями
Sirmax (обсуждение | вклад) |
Sirmax (обсуждение | вклад) |
||
Строка 279: | Строка 279: | ||
==ElasticSearch== |
==ElasticSearch== |
||
+ | Also it is possible to sens data to ElasticSearch. |
||
+ | We need it only if we would like save logs. Test plugin do not generate any log, but anyway it is possible to send it to ElasticSearch and see in Kibana Dashboard |
||
+ | <BR> |
||
+ | First we need to check ElasticSearch output plugin configuration: |
||
+ | <PRE> |
||
+ | [elasticsearch_output] |
||
+ | type = "ElasticSearchOutput" |
||
+ | message_matcher = "Type == 'log' || Type == 'notification'" |
||
+ | encoder = "elasticsearch_encoder" |
||
+ | flush_interval = 5000 |
||
+ | flush_count = 10 |
||
+ | server = "http://192.168.0.3:9200" |
||
+ | use_buffering = false |
||
+ | </PRE> |
||
+ | As we can see, message_matcher will not match any of messages generated by plugin. |
||
+ | All messages have Type="heka.sandbox.HEARTBEAT" |
||
+ | <BR> |
||
+ | So first we need to modify message_matcher in elasticsearch output plugin configuration: |
||
+ | <PRE> |
||
+ | message_matcher = "Type == 'log' || Type == 'notification' || Type =~ /heartbeat/ " |
||
+ | </PRE> |
||
+ | Type of message is used for index name so it must be lower case. |
||
+ | So, second step is modify plugin code to use lower case in Type field and add additional fields. |
||
==Kibana== |
==Kibana== |
Версия 23:58, 7 февраля 2016
Heka Filter AFD example
Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:
- count number of incoming messages per second
- calculate average rate
- put this rate into InfluxDB and create chart in Grafana
Collecting data
To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.
Cirular buffer
Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:
- https://github.com/mozilla-services/lua_circular_buffer/blob/master/README.md
- https://github.com/mozilla-services/lua_sandbox/
- http://hekad.readthedocs.org/en/latest/sandbox/
Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.
Simplest buffer created with <syntaxhighlight lang="lua">data1 = circular_buffer.new(60*24, 1, 60)</syntaxhighlight> looks like:
data1 +-----+-------+ |Time | Data1 | +-----+-------+ |-Ns | aN | |... | ... | |-2s | a3 | |-1s | a2 | |Now()| a1 | +-----+-------+
Arguments
- rows (unsigned) The number of rows in the buffer (must be > 1)
- columns (unsigned)The number of columns in the buffer (must be > 0)
- seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
- enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.
So here we created buffer to save data during 1 day (60min*24h)
Process messages
Each message comes to filter is calling process_message() function. <syntaxhighlight lang="lua">function process_message()
local ts = read_message("Timestamp") data1:add(ts, 1, 1) return 0
end</syntaxhighlight>
This function is very simple, it only populates circular buffer data1.
What exactly it does:
- take timestamp of message
- add 1 to raw corresponding to the timestamp
As result in 3 seconds after start data1 circular buffer looks like
data1 +-----+-------+ |Time | Data1 | +-----+-------+ |-Ns | nan | |-N+1s| nan | |... | ... | |-3s | nan | |-2s | 121 | |-1s | 120 | |Now()| 12 | +-----+-------+
in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"
Processing collected data
The main data processing is doing timer_event(ns) function.
This function is called every ticker_interval
This function is doing the following:
- log data1 circular buffer for debug with inject_payload() call. Logged message looks like:
:Timestamp: 2016-02-05 12:45:31.985034391 +0000 UTC :Type: heka.sandbox-output :Hostname: node-6 :Pid: 26318 :Uuid: 6457c1de-87ef-4069-ac57-051ecdb73bca :Logger: heartbeat_filter_lua :Payload: {"time":1454589960,"rows":1440,"columns":1,"seconds_per_row":60,"column_info":[{"name":"Messages","unit":"count","aggregation":"sum"}]} nan nan nan <SKIP> 85 86 51 :EnvVersion: :Severity: 7 :Fields: | name:"payload_type" type:string value:"cbuf_lua_test1" representation:"file-extension" | name:"payload_name" type:string value:""
So you can see collected data in :Payload field
- Calculates average messages rate. It may looks a little bit complicated, but please pay your attention: we DO NOT collect all messages in "currently running" second.
So instead of average = (a1+ ..+aN ) /N we need (a2+ .. +aN)/(N-1)
- Calculate difference in percents with Current messages rate and average and create alarm if needed. (For future use for alarming)
- Create debug message.
- Create message to be processed with influxdb output
function prev_time() calculates "time some steps ago" in circular buffer format. We need it because circular buffer library uses second,and time stamp is nano-seconds.
<syntaxhighlight lang="lua">function timer_event(ns)
local Payload msg.Fields['lua_heartbeat_2'] = 'True' msg.Timestamp = ns inject_payload("cbuf_lua_test1", "", data1) avg, active_rows = data1:compute("avg", 1) if active_rows > 2 then
-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now --. --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)
a1 = data1:get(ns, 1) if a1 == nil then
-- nil means 'no messages' = 0 messages
a1 = 0 end
N = active_rows
.
avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real delta_percents = math.abs(100 - current_percents )
debug_message.Timestamp = ns debug_message.Fields['AVG_messages'] = avg debug_message.Fields['AVG_messages_real'] = avg_real debug_message.Fields['active_rows'] = active_rows debug_message.Fields['current_percents'] = current_percents debug_message.Fields['delta_percents'] = delta_percents debug_message.Fields['critical_delta'] = critical_delta debug_message.Fields['is_alert'] = 'False' inject_message(debug_message)
influxdb_message.Timestamp = ns influxdb_message.Fields['payload_type'] = 'txt' influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000) .. "\n"
inject_message(influxdb_message)
if delta_percents > critical_delta then alert_message.Timestamp = ns alert_message.Fields['AVG_messages'] = avg alert_message.Fields['AVG_messages_real'] = avg_real alert_message.Fields['active_rows'] = active_rows alert_message.Fields['current_percents'] = current_percents alert_message.Fields['delta_percents'] = delta_percents alert_message.Fields['critical_delta'] = critical_delta alert_message.Fields['is_alert'] = 'True' inject_message(alert_message) end end inject_message(msg)
-- return 0 end</syntaxhighlight> Messages generated with inject_message(influxdb_message) call will be processed by inluxdb-output. These messages looks like:
:Timestamp: 2016-02-05 08:00:03.152148992 +0000 UTC :Type: heka.sandbox.HEARTBEAT :Hostname: node-6 :Pid: 0 :Uuid: b3b3319c-0db6-47fa-a2d5-55877d489156 :Logger: heartbeat_filter_lua :Payload: AVG_messages_real,deployment_id=3,hostname=node-6 value=51 1454659203152 :EnvVersion: :Severity: 6 :Fields: | name:"payload_type" type:string value:"txt" | name:"payload_name" type:string value:"influxdb"
Plugin Configuration
I use the following plugin configuration:
[heartbeat_filter_lua] type = "SandboxFilter" filename = "/usr/share/lma_collector/filters/afd_test2.lua" message_matcher = "Type !~ /HEARTBEAT/ && Fields[payload_type] !~ /cbuf_lua_test1/" #ticker_interval = 120 #preserve_data = false ticker_interval = 1 [heartbeat_filter_lua.config] critical_delta = 1 hostname = "node-6"
Loopback error
Please pay your attention - there is a possibility to create endless loop.
So in case we use message_matcher = "TRUE" (means "all messages") we will create endless loop because any message created by filter triggers filter call again.
[heartbeat_filter_lua] type = "SandboxFilter" filename = "/usr/share/lma_collector/filters/afd_test2.lua" message_matcher = "TRUE" ticker_interval = 1 [heartbeat_filter_lua.config] critical_delta = 1
2016/02/03 18:39:40 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe 2016/02/03 18:39:41 Plugin 'aggregator_tcpoutput' error: writing to 192.168.0.7:5565: write tcp 192.168.0.7:5565: broken pipe 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher) 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped 2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.
InfluxDB
Next step is check data in InfluxDB. Siplest way is using ipython and influxdb python client (http://influxdb-python.readthedocs.org/en/latest/include-readme.html):
import influxdb client = influxdb.InfluxDBClient("<elastic search server's IP addrtess>", "8086", "lma_user", "lma_password") client.get_list_database() Out[41]: [{u'name': u'_internal'}, {u'name': u'lma'}] query = 'select value from AVG_messages_real' result = client.query(query) for i in result: print(json.dumps(i, sort_keys=True, indent=4, separators=(',', ': ') ) )
Output:
[ { "time": "2016-02-05T08:00:02.148Z", "value": 51 }, { "time": "2016-02-05T08:00:03.152Z", "value": 51 }, { "time": "2016-02-05T08:00:04.151Z", "value": 51 } ]
Grafana
Now we can create simple chart in Grafana:
As you can see on screenshot we use following request to build chart:
SELECT mean("value") AS "value" FROM "AVG_messages_real" WHERE "hostname" = 'node-6' AND "deployment_id" = '3' AND $timeFilter GROUP BY time(10s)
More details about Grafana: http://plugins.mirantis.com/docs/i/n/influxdb_grafana/influxdb_grafana-0.7.0.pdf
ElasticSearch
Also it is possible to sens data to ElasticSearch.
We need it only if we would like save logs. Test plugin do not generate any log, but anyway it is possible to send it to ElasticSearch and see in Kibana Dashboard
First we need to check ElasticSearch output plugin configuration:
[elasticsearch_output] type = "ElasticSearchOutput" message_matcher = "Type == 'log' || Type == 'notification'" encoder = "elasticsearch_encoder" flush_interval = 5000 flush_count = 10 server = "http://192.168.0.3:9200" use_buffering = false
As we can see, message_matcher will not match any of messages generated by plugin.
All messages have Type="heka.sandbox.HEARTBEAT"
So first we need to modify message_matcher in elasticsearch output plugin configuration:
message_matcher = "Type == 'log' || Type == 'notification' || Type =~ /heartbeat/ "
Type of message is used for index name so it must be lower case. So, second step is modify plugin code to use lower case in Type field and add additional fields.
Kibana
Plugin code
<syntaxhighlight lang="lua">require "string" require "math"
require "circular_buffer"
-- 60 min * 24h = 1 day, 1 row 60 sec per row
data1 = circular_buffer.new(60*24, 1, 60)
local COUNT1 = data1:set_header(1, "Messages", "count", "sum")
local critical_delta = read_config('critical_delta') or error('critical_delta must be specified!')
local hostname = read_config('hostname') or error('hostname must be specified!')
local c=0
local msg = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}
}
local debug_message = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}
}
local alert_message = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}
}
local influxdb_message = {
Type = "HEARTBEAT", Timestamp = nil, Severity = 6, Fields = {}, Payload = ""
}
function prev_time(step, ts, cbuf)
rows, columns, seconds_per_row = cbuf:get_configuration() return ts - (1000000000 * seconds_per_row * step)
end
function process_message()
local ts = read_message("Timestamp") data1:add(ts, 1, 1) return 0
end
function timer_event(ns)
local Payload msg.Fields['lua_heartbeat_2'] = 'True' msg.Timestamp = ns inject_payload("cbuf_lua_test1", "", data1) avg, active_rows = data1:compute("avg", 1) if active_rows > 2 then
-- avg_real - avg without LAST sample -- avg = ( a1 + a2 + ... + aN ) / N -- We need to find ( a2 + ... +aN ) / (N -1) because last value may be not populated now -- --- avg_real = (a1 + a2 + ... + aN - a1/N ) * ( N/(N-1) ) -- a1 + .. + aN = avg, so: -- avg_real = ( avg - a1/N ) * ( N/( N - 1 ) ) -- N = active_rows -- a1 = data1:get(ns, 1) (Last value)
a1 = data1:get(ns, 1) if a1 == nil then
-- nil means 'no messages' = 0 messages
a1 = 0 end
N = active_rows
avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real delta_percents = math.abs(100 - current_percents )
debug_message.Timestamp = ns debug_message.Fields['AVG_messages'] = avg debug_message.Fields['AVG_messages_real'] = avg_real debug_message.Fields['active_rows'] = active_rows debug_message.Fields['current_percents'] = current_percents debug_message.Fields['delta_percents'] = delta_percents debug_message.Fields['critical_delta'] = critical_delta debug_message.Fields['is_alert'] = 'False' inject_message(debug_message)
influxdb_message.Timestamp = ns influxdb_message.Fields['payload_type'] = 'txt' influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000) .. "\n"
inject_message(influxdb_message)
if delta_percents > critical_delta then alert_message.Timestamp = ns alert_message.Fields['AVG_messages'] = avg alert_message.Fields['AVG_messages_real'] = avg_real alert_message.Fields['active_rows'] = active_rows alert_message.Fields['current_percents'] = current_percents alert_message.Fields['delta_percents'] = delta_percents alert_message.Fields['critical_delta'] = critical_delta alert_message.Fields['is_alert'] = 'True' inject_message(alert_message) end end inject_message(msg)
-- return 0 end</syntaxhighlight>