Heka Filter afd example

Материал из Wiki
Перейти к: навигация, поиск

Heka Filter AFD example


Here is example of AFD (AnomalyFaultDetection) plugin.
The main idea plugin is:

  • count number of incoming messages per second
  • calculate average rate
  • put this rate into InfluxDB and create chart in Grafana


Collecting data

To collect message rate we are using filter which called on each incoming message.
Each process_message() call just add 1 to circular buffer.

Cirular buffer

Heka's LUA SandBox has bilt-in Circular Buffer Library ehich can be used for data aggregation.
More details:


Circular buffer works like RRDTools (RRD is round-robin archive) but store data in RAM.

Simplest buffer created with
data1 = circular_buffer.new(60*24, 1, 60)
looks like:
data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  aN   |
|...  | ...   |
|-2s  |  a3   |
|-1s  |  a2   |
|Now()|  a1   |
+-----+-------+


Arguments

  • rows (unsigned) The number of rows in the buffer (must be > 1)
  • columns (unsigned)The number of columns in the buffer (must be > 0)
  • seconds_per_row (unsigned) The number of seconds each row represents (must be > 0).
  • enable_delta (bool optional, default false) When true the changes made to the circular buffer between delta outputs are tracked.

So here we created buffer to save data during 1 day (60min*24h)

Process messages

Each message comes to filter is calling process_message() function.

function process_message()
    local ts = read_message("Timestamp")
    data1:add(ts, 1, 1)
    return 0
end

This function is very simple, it only populates circular buffer data1.
What exactly it does:

  • take timestamp of message
  • add 1 to raw corresponding to the timestamp

As result in 3 seconds after start data1 circular buffer looks like

data1
+-----+-------+
|Time | Data1 |
+-----+-------+
|-Ns  |  nan  |
|-N+1s|  nan  |
|...  | ...   |
|-3s  |  nan  |
|-2s  |  121  |
|-1s  |  120  |
|Now()|  12   |
+-----+-------+

in current (now()) raw we have invalid number of messages because "current second" is still "in progress". "nan" means "undefined value"

Processing collected data

The main data processing is doing timer_event(ns) function. This function is called every ticker_interval

This function is doing the following:

  • log data1 circular buffer for debug with inject_payload() call. Logged message looks like:
:Timestamp: 2016-02-05 12:45:31.985034391 +0000 UTC
:Type: heka.sandbox-output
:Hostname: node-6
:Pid: 26318
:Uuid: 6457c1de-87ef-4069-ac57-051ecdb73bca
:Logger: heartbeat_filter_lua
:Payload: {"time":1454589960,"rows":1440,"columns":1,"seconds_per_row":60,"column_info":[{"name":"Messages","unit":"count","aggregation":"sum"}]}
nan
nan
nan
<SKIP>
85
86
51

:EnvVersion:
:Severity: 7
:Fields:
    | name:"payload_type" type:string value:"cbuf_lua_test1" representation:"file-extension"
    | name:"payload_name" type:string value:""

So you can see collected data in :Payload field

  • Calculates average messages rate. It may looks a little bit complicated, but please pay your attention: we DO NOT collect all messages in "currently running" second.

So instead of average = (a1+ ..+aN ) /N we need (a2+ .. +aN)/(N-1)

  • Calculate difference in percents with Current messages rate and average and create alarm if needed. (For future use for alarming)
  • Create debug message.
  • Create message to be processed with influxdb output

function prev_time() calculates "time some steps ago" in circular buffer format. We need it because circular buffer library uses second,and time stamp is nano-seconds.


function timer_event(ns)
  local Payload
  msg.Fields['lua_heartbeat_2'] = 'True'
  msg.Timestamp = ns
  inject_payload("cbuf_lua_test1", "", data1)
  avg, active_rows = data1:compute("avg", 1)
  if active_rows > 2 then
--  avg_real - avg without LAST sample
--  avg = ( a1 + a2 + ... + aN ) / N
--  We need to  find  ( a2 + ... +aN ) / (N -1) because last value may be not populated now
--.
--- avg_real = (a1 + a2 + ... + aN  - a1/N ) * ( N/(N-1) )
--  a1 + .. + aN = avg, so:
--  avg_real =  ( avg - a1/N ) * ( N/( N - 1 ) )
--  N = active_rows
--  a1 = data1:get(ns, 1) (Last value)
 
    a1 = data1:get(ns, 1)
    if a1 == nil then
--  nil means 'no messages' = 0 messages
      a1 = 0
    end
 
    N = active_rows
.
    avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
 
    current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
    delta_percents = math.abs(100 - current_percents )
 
    debug_message.Timestamp = ns
    debug_message.Fields['AVG_messages'] = avg
    debug_message.Fields['AVG_messages_real'] = avg_real
    debug_message.Fields['active_rows'] = active_rows
    debug_message.Fields['current_percents'] = current_percents
    debug_message.Fields['delta_percents']   = delta_percents
    debug_message.Fields['critical_delta']   = critical_delta
    debug_message.Fields['is_alert']         = 'False'
    inject_message(debug_message)
 
    influxdb_message.Timestamp = ns
    influxdb_message.Fields['payload_type'] = 'txt'
    influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
    influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
 
    inject_message(influxdb_message)
 
    if delta_percents > critical_delta then
      alert_message.Timestamp = ns
      alert_message.Fields['AVG_messages']       = avg
      alert_message.Fields['AVG_messages_real']  = avg_real
      alert_message.Fields['active_rows']      = active_rows
      alert_message.Fields['current_percents'] = current_percents
      alert_message.Fields['delta_percents']   = delta_percents
      alert_message.Fields['critical_delta']   = critical_delta
      alert_message.Fields['is_alert']         = 'True'
      inject_message(alert_message)
    end
  end
 inject_message(msg)
--    return 0
end

Messages generated with inject_message(influxdb_message) call will be processed by inluxdb-output. These messages looks like:

:Timestamp: 2016-02-05 08:00:03.152148992 +0000 UTC
:Type: heka.sandbox.HEARTBEAT
:Hostname: node-6
:Pid: 0
:Uuid: b3b3319c-0db6-47fa-a2d5-55877d489156
:Logger: heartbeat_filter_lua
:Payload: AVG_messages_real,deployment_id=3,hostname=node-6 value=51 1454659203152

:EnvVersion:
:Severity: 6
:Fields:
    | name:"payload_type" type:string value:"txt"
    | name:"payload_name" type:string value:"influxdb"

Plugin Configuration

I use the following plugin configuration:

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
message_matcher = "Type  !~ /HEARTBEAT/ && Fields[payload_type] !~ /cbuf_lua_test1/"
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 1
    hostname = "node-6"

This is just test configuration, but it is good enough to start.

Loopback error

Please pay your attention - there is a possibility to create endless loop.

So in case we use message_matcher = "TRUE" (means "all messages") we will create endless loop because any message created by filter triggers filter call again.

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
message_matcher = "TRUE"
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 1

Heka stops plugin with endless loop:

<SKIP>
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: attempted to Inject a message to itself
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua' error: Terminated. Reason: timer_event() /usr/share/lma_collector/filters/afd_test2.lua:57: inject_payload() creates a circular reference (matches this plugin's message_matcher)
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': stopped
2016/02/03 18:39:41 Plugin 'heartbeat_filter_lua': has stopped, exiting plugin without shutting down.

InfluxDB

Next step is check data in InfluxDB. Siplest way is using ipython and influxdb python client:

import influxdb
client = influxdb.InfluxDBClient("<elastic search server's IP addrtess>", "8086", "lma_user", "lma_password")
client.get_list_database()
 
Out[41]: [{u'name': u'_internal'}, {u'name': u'lma'}]
 
query = 'select value from AVG_messages_real'
result = client.query(query)
for i in result:  print(json.dumps(i, sort_keys=True, indent=4, separators=(',', ': ') ) )

Output:

[
    {
        "time": "2016-02-05T08:00:02.148Z",
        "value": 51
    },
    {
        "time": "2016-02-05T08:00:03.152Z",
        "value": 51
    },
    {
        "time": "2016-02-05T08:00:04.151Z",
        "value": 51
    }
]

Grafana

Now we can create simple chart in Grafana:
Screen Shot 2016-02-05 at 16.21.56.png
As you can see on screenshot we use following request to build chart:

SELECT mean("value") AS "value" FROM "AVG_messages_real" WHERE "hostname" = 'node-6' AND "deployment_id" = '3' AND $timeFilter GROUP BY time(10s)

More details about Grafana is in Plugin Documentation

ElasticSearch

Also it is possible to sens data to ElasticSearch. We need it only if we would like save logs. Test plugin do not generate any log, but anyway it is possible to send it to ElasticSearch and see in Kibana Dashboard

Send data to ElasticSearch

First we need to check ElasticSearch output plugin configuration:

[elasticsearch_output]
type = "ElasticSearchOutput"
message_matcher = "Type == 'log' || Type  == 'notification'"
encoder = "elasticsearch_encoder"
flush_interval = 5000
flush_count = 10
server = "http://192.168.0.3:9200"
use_buffering = false

As we can see, message_matcher will not match any of messages generated by plugin. All messages have Type="heka.sandbox.HEARTBEAT"
So first we need to modify message_matcher in elasticsearch output plugin configuration:

message_matcher = "Type == 'log' || Type  == 'notification' || Type  =~ /heartbeat/ "

Type of message is used for index name so it must be lower case. So, second step is modify plugin code to use lower case in Type field and add additional fields.

local debug_message = {
    Type = "heartbeat",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
<SKIP>
    debug_message.Timestamp = ns
    debug_message.Payload = 'AVG_messages: ' .. avg .. ' AVG_messages_real: ' ..  avg_real .. ' active_rows:' ..  active_rows .. ' current_percents:' .. current_percents .. ' delta_percents:' .. delta_percents .. ' critical_delta:' .. critical_delta .. ' is_alert' ..  ' False \n'
 
    debug_message.Fields['deployment_mode'] = "ha_compact"
    debug_message.Fields['environment_label'] = "test2"
    debug_message.Fields['openstack_region'] = "RegionOne"
    debug_message.Fields['deployment_id'] = "3"
    debug_message.Fields['severity_label'] = "INFO"
    debug_message.Fields['programname'] = "hekad"
    debug_message.Fields['openstack_roles'] = "primary-controller"
    debug_message.Fields['openstack_release'] = "2015.1.0-7.0"
 
 
    inject_message(debug_message)

Check data in ElasticSearch

To check do we send valid data and was it saved in ElasticSearch we can directly send request to ElasticSearch engine. In example below we sent request from mode where ElasticSearch is running, so we use localhost as ElasticSearch endpoint.

curl -XGET 'http://127.0.0.1:9200/_search?size=1' -d '
{
    "query" : {
         "match" : {
            "Severity" : 6
         }
     }
}' | python -m json.tools

Example of output:

            {
                "_id": "AVKyN83PDm8nP0Qg-76A",
                "_index": "heka.sandbox.heartbeat-2016.02.05",
                "_score": 1.0,
                "_source": {
                    "EnvVersion": "",
                    "Hostname": "node-6",
                    "Logger": "heartbeat_filter_lua",
                    "Payload": "AVG_messages: 80.666666666667 AVG_messages_real: 82.25 active_rows:9 current_percents:105.77507598784 delta_percents:5.7750759878419 critical_delta:1 is_alert False",
                    "Pid": 0,
                    "Severity": 6,
                    "Timestamp": "2016-02-05T16:15:42",
                    "Type": "heka.sandbox.heartbeat",
                    "Uuid": "f76e534a-fede-4d10-a5ca-1fea89a196e2",
                    "deployment_id": "3",
                    "deployment_mode": "ha_compact",
                    "environment_label": "test2",
                    "openstack_region": "RegionOne",
                    "openstack_release": "2015.1.0-7.0",
                    "openstack_roles": "primary-controller",
                    "programname": "hekad",
                    "severity_label": "INFO"
                },
                "_type": "message"
            },

So we can see data saved in ElasticSearch. More details: http://okfnlabs.org/blog/2013/07/01/elasticsearch-query-tutorial.html

Kibana

To fing logs created by plugin in Kibana we need to do the following:

  • Go to dashboard config (click n gear):

01 Kibana 3 - Logging, Monitoring and Alerting - Logs 2016-02-07 23-38-03.png

  • Go to "Index" tab (1), check none/_all in index (3) and save (4)

02 Kibana 3 - Logging, Monitoring and Alerting - Logs 2016-02-08 00-09-38.png

  • Find your logs, e.g. using hekad keyword or AVG keyword

03 Kibana 3 - Logging, Monitoring and Alerting - Logs 2016-02-08 00-14-39.png

Nagios

Finally we need to create alert in nagios and send data ti nagios via API.

Nagios configuration

In situations, where you cannot use send_nsca for submitting passive check results over the network, you can use the Nagios web interface.
In our case it is much faster not to use external tools like 'send_nsca' Anyway, it still possible and will be described below in nsca section

  • Define command (dummy) wich always return "warning":
define command {
        command_line                   /usr/lib/nagios/plugins/check_dummy 3 'No data received for at least 130 seconds'
        command_name                   return-unknown-node-6-message-rate


}
  • Define service:
define service {
        active_checks_enabled          0
        check_command                  return-unknown-node-6-message-rate
        check_freshness                1
        check_interval                 1
        contact_groups                 openstack
        freshness_threshold            65
        host_name                      node-6
        max_check_attempts             2
        notifications_enabled          0
        passive_checks_enabled         1
        process_perf_data              0
        retry_interval                 1
        service_description            heka-nodes.message-rate
        use                            generic-service
}
  • Restart nagios (service nagios3 restart)
  • Check services (new service is in 'PENDING' state)

00 Nagios Core 2016-02-08 14-32-11.png

  • Manually send data to nagios
#!/bin/bash

NAGIOS_URL="http://192.168.0.3:8001/cgi-bin/cmd.cgi"
HOST="node-6"
SERVICE="heka-nodes.message-rate"
curl -v -u "nagiosadmin:r00tme" \
"${NAGIOS_URL}?cmd_typ=30&cmd_mod=2&host=${HOST}&service=${SERVICE}&plugin_state=0&plugin_output=CheckOK&btnSubmit=Commit"

NAGIOS_URL login and password can be found in heka nagios output plugin configuration (/etc/lma_collector/output-nagios_gse_global_clusters.toml) Expected result is code 200:

* Hostname was NOT found in DNS cache
*   Trying 192.168.0.3...
* Connected to 192.168.0.3 (192.168.0.3) port 8001 (#0)
* Server auth using Basic with user 'nagiosadmin'
> HEAD /cgi-bin/cmd.cgi?cmd_typ=30&cmd_mod=2&host=node-6&service=heka-nodes.message-rate&plugin_state=0&plugin_output=CheckOK&btnSubmit=Commit HTTP/1.1
> Authorization: Basic bmFnaW9zYWRtaW46cjAwdG1l
> User-Agent: curl/7.35.0
> Host: 192.168.0.3:8001
> Accept: */*
>
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Date: Mon, 08 Feb 2016 13:08:38 GMT
Date: Mon, 08 Feb 2016 13:08:38 GMT
* Server Apache/2.4.7 (Ubuntu) is not blacklisted
< Server: Apache/2.4.7 (Ubuntu)
Server: Apache/2.4.7 (Ubuntu)
< Vary: Accept-Encoding
Vary: Accept-Encoding
< Connection: close
Connection: close
< Content-Type: text/html
Content-Type: text/html


01 Nagios Core 2016-02-08 15-09-55.png

Plugin modification

To send data to nagios we need to modify plugin in following way:

local nagios_message = {
    Type = "simple_nagios",
    Timestamp = nil,
    Severity = 6,
    Fields = {},
    Payload = ""
 
}
 
 
<SKIP>
 
      nagios_message.Timestamp = ns
      nagios_message.Fields['AVG_messages']       = avg
      nagios_message.Fields['AVG_messages_real']  = avg_real
      nagios_message.Fields['active_rows']      = active_rows
      nagios_message.Fields['current_percents'] = current_percents
      nagios_message.Fields['delta_percents']   = delta_percents
      nagios_message.Fields['critical_delta']   = critical_delta
      nagios_message.Fields['service']          = "heka-nodes.message-rate"
      nagios_message.Payload= "heka-nodes.message-rate"
 
      nagios_message.Fields['is_alert']         = 'False'
 
    if delta_percents > critical_delta then
       <SKIP>
      nagios_message.Fields['is_alert']         = 'True'
       <SKIP>
    end
    inject_message(nagios_message)

Final plugin configuration:

[heartbeat_filter_lua]
type = "SandboxFilter"
filename = "/usr/share/lma_collector/filters/afd_test2.lua"
#message_matcher = "TRUE"
#message_matcher = "Type != 'HEARTBEAT'"
message_matcher = "Type  !~ /HEARTBEAT/ && Fields[payload_type] !~ /cbuf_lua_test1/ && Type !~ /heartbeat/ && Type !~ /simple_nagios/"
#ticker_interval = 120
#preserve_data = false
ticker_interval = 1

[heartbeat_filter_lua.config]
    critical_delta = 20
    hostname = "node-6"

Simple Nagios Encoder

To send data to nagios we need to create (or 'encode' in terms of Heka) URL string and use HTTP output plugin.
In LMA there is nagios encoder, but we need simple one and this is just example how can be decider created.


require "string"require "string"
 
function process_message ()
    local hostname = read_message("Hostname")
    local logger   = read_message("Logger")
    local is_alert = read_message("Fields[is_alert]")
    local service  = read_message("Fields[service]")
    local plugin_state="0"
    local plugin_output="CheckOK"
 
    if is_alert == "True" then
       plugin_state="2"
       plugin_output="CheckCritical"
     end
 
     if service == nil then.
        service = 'nil_service'
     end
 
     if hostname == nil then.
        hostname = 'nil_service'
     end
 
--  "cmd_typ=30&cmd_mod=2&host=${HOST}&service=${SERVICE}&plugin_state=0&plugin_output=CheckOK&btnSubmit=Commit"
 
    if is_alert == "True" or is_alert == "False" then
    inject_payload("txt", "test_payload",
                    string.format("cmd_typ=30&cmd_mod=2&host=%s&service=%s&plugin_state=%s&plugin_output=%s&btnSubmit=Commit", hostname, service, plugin_state, plugin_output))
     end
    return 0
end

Decoder configuration have 2 sections - first sends data to file for debugging, and second is for sending data to nagios.
Send data to file:

[nagios_simple]
type = "FileOutput"
message_matcher = "Fields[aggregator] == NIL && Type =~ /simple_nagios/  && ( Fields[is_alert] == 'True' || Fields[is_alert] == 'False' )"
path = "/var/log/heka-simple-nagios.log"
perm = "666"
encoder = "simple_nagios_encoder"

Send data to nagios:

[nagios_simple_http]
type = "HttpOutput"
message_matcher = "Fields[aggregator] == NIL && Type =~ /simple_nagios/  && ( Fields[is_alert] == 'True' || Fields[is_alert] == 'False' )"
encoder = "simple_nagios_encoder"
address = "http://192.168.0.3:8001/cgi-bin/cmd.cgi"
username = "nagiosadmin"
password = "r00tme"
http_timeout = 2000
method = "POST"
 [nagios_simple_http.headers]
     Content-Type = ["application/x-www-form-urlencoded"]

Debug and results

With tcpflowtool you can see requests:

POST /cgi-bin/cmd.cgi HTTP/1.1
Host: 192.168.0.3:8001
User-Agent: Go 1.1 package http
Content-Length: 124
Authorization: Basic bmFnaW9zYWRtaW46cjAwdG1l
Content-Type: application/x-www-form-urlencoded
Accept-Encoding: gzip

cmd_typ=30&cmd_mod=2&host=node-6&service=heka-nodes.message-rate&plugin_state=2&plugin_output=CheckCritical&btnSubmit=Commit

And see results in nagios:
00 Extended Information 2016-02-08 23-58-35.png

NSCA

There is another way to send data to nagios: use NSCA.
NSCA (Nagios Service Check Acceptor) is a Linux/Unix daemon allows you to integrate passive alerts and checks from remote machines and applications with Nagios.
It is not possible to use os module in filters because of Heka's Sandbox limitations (or you need recompile sandbox module), but it is possible to use it in output. So another way to send data to the nagios server is create output plugin which calls external binary file.
Notice: by-default nagios server does not accept NSCA messages, so you need enable nsca service and allow incoming connections to nsca port:

service nsca start
iptables -I INPUT -m tcp -p tcp --dport 5667 -j ACCEPT


Plugin configuration is simple:

[simple_nsca]
type = "SandboxOutput"
filename = "/usr/share/lma_collector/outputs/exec.lua"
message_matcher = "Fields[aggregator] == NIL && Type =~ /simple_nagios/  && ( Fields[is_alert] == 'True' || Fields[is_alert] == 'False' )"
  [lastfile_simple_1.config]
  nagios_server = "192.168.0.3"

Output plugin code:

require "io"
require "os"
require "string"
 
local nagios_server = read_config('nagios_server') or error('path required')
function process_message()
    local hostname = read_message("Hostname")
    local logger   = read_message("Logger")
    local is_alert = read_message("Fields[is_alert]")
    local service  = read_message("Fields[service]")
 
    local plugin_state="0"
    local plugin_output="CheckOK"
    if is_alert == "True" then
       plugin_state="2"
       plugin_output="CheckCritical"
     end
 
     if service == nil then
        service = 'nil_service'
     end
 
     if hostname == nil then
        hostname = 'nil_service'
     end
 
 
    if is_alert == "True" or is_alert == "False" then
      os.execute(string.format("echo  %s:%s:%s:%s | /usr/sbin/send_nsca  -H %s -d : -c /etc/send_nsca.cfg" ,hostname, service, plugin_state, plugin_output, nagios_server))
-- debug
      os.execute(string.format("echo  %s:%s:%s:%s   /usr/sbin/send_nsca  -H %s -d : -c /etc/send_nsca.cfg >> /tmp/nagios_debug" ,hostname, service, plugin_state, plugin_output, nagios_server))
 
     end
    return 0
end

Of course it is slower compared with http requests but it is only way to send data using pre-compiled propriety tools to existing propriety monitoring systems.

Plugin code

Basic version (v1)

This is basic vesion w/o nagios code

require "string"
require "math"
 
require "circular_buffer"
 
 
-- 60 min * 24h = 1 day, 1 row 60 sec per row
data1 = circular_buffer.new(60*24, 1, 60)
 
 
 
local COUNT1 = data1:set_header(1, "Messages", "count", "sum")
 
local critical_delta = read_config('critical_delta') or error('critical_delta must be specified!')
 
local hostname = read_config('hostname') or error('hostname must be specified!')
 
 
local c=0
local msg = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
 
 
local debug_message = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
 
 
 
local alert_message = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
 
local influxdb_message = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {},
    Payload = ""
}
 
 
 
function prev_time(step, ts, cbuf)
  rows, columns, seconds_per_row = cbuf:get_configuration()
  return ts - (1000000000 * seconds_per_row * step)
end
 
function process_message()
    local ts = read_message("Timestamp")
    data1:add(ts, 1, 1)
    return 0
end
 
 
function timer_event(ns)
  local Payload
  msg.Fields['lua_heartbeat_2'] = 'True'
  msg.Timestamp = ns
  inject_payload("cbuf_lua_test1", "", data1)
  avg, active_rows = data1:compute("avg", 1)
  if active_rows > 2 then
--  avg_real - avg without LAST sample
--  avg = ( a1 + a2 + ... + aN ) / N
--  We need to  find  ( a2 + ... +aN ) / (N -1) because last value may be not populated now
--
--- avg_real = (a1 + a2 + ... + aN  - a1/N ) * ( N/(N-1) )
--  a1 + .. + aN = avg, so:
--  avg_real =  ( avg - a1/N ) * ( N/( N - 1 ) )
--  N = active_rows
--  a1 = data1:get(ns, 1) (Last value)
 
    a1 = data1:get(ns, 1)
    if a1 == nil then
--  nil means 'no messages' = 0 messages
      a1 = 0
    end
 
    N = active_rows
 
    avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
 
    current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
    delta_percents = math.abs(100 - current_percents )
 
    debug_message.Timestamp = ns
    debug_message.Fields['AVG_messages'] = avg
    debug_message.Fields['AVG_messages_real'] = avg_real
    debug_message.Fields['active_rows'] = active_rows
    debug_message.Fields['current_percents'] = current_percents
    debug_message.Fields['delta_percents']   = delta_percents
    debug_message.Fields['critical_delta']   = critical_delta
    debug_message.Fields['is_alert']         = 'False'
    inject_message(debug_message)
 
    influxdb_message.Timestamp = ns
    influxdb_message.Fields['payload_type'] = 'txt'
    influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
    influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
 
    inject_message(influxdb_message)
 
    if delta_percents > critical_delta then
      alert_message.Timestamp = ns
      alert_message.Fields['AVG_messages']       = avg
      alert_message.Fields['AVG_messages_real']  = avg_real
      alert_message.Fields['active_rows']      = active_rows
      alert_message.Fields['current_percents'] = current_percents
      alert_message.Fields['delta_percents']   = delta_percents
      alert_message.Fields['critical_delta']   = critical_delta
      alert_message.Fields['is_alert']         = 'True'
      inject_message(alert_message)
    end
  end
 inject_message(msg)
--    return 0
end

Final version

require "string"
require "math"
 
require "circular_buffer"
 
 
-- 60 min * 24h = 1 day, 1 row 60 sec per row
data1 = circular_buffer.new(60*24, 1, 60)
 
 
 
local COUNT1 = data1:set_header(1, "Messages", "count", "sum")
 
local critical_delta = read_config('critical_delta') or error('critical_delta must be specified!')
 
local hostname = read_config('hostname') or error('hostname must be specified!')
 
 
local c=0
local msg = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
 
 
local debug_message = {
    Type = "heartbeat",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
 
 
 
local nagios_message = {
    Type = "simple_nagios",
    Timestamp = nil,
    Severity = 6,
    Fields = {},
    Payload = ""
 
}
 
 
 
local alert_message = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {}
}
 
local influxdb_message = {
    Type = "HEARTBEAT",
    Timestamp = nil,
    Severity = 6,
    Fields = {},
    Payload = ""
}
 
 
 
function prev_time(step, ts, cbuf)
  rows, columns, seconds_per_row = cbuf:get_configuration()
  return ts - (1000000000 * seconds_per_row * step)
end
 
function process_message()
    local ts = read_message("Timestamp")
    data1:add(ts, 1, 1)
    return 0
end
 
 
function timer_event(ns)
  local Payload
  msg.Fields['lua_heartbeat_2'] = 'True'
  msg.Timestamp = ns
--  inject_payload("cbuf_lua_test1", "", data1)
  avg, active_rows = data1:compute("avg", 1)
  if active_rows > 2 then
--  avg_real - avg without LAST sample
--  avg = ( a1 + a2 + ... + aN ) / N
--  We need to  find  ( a2 + ... +aN ) / (N -1) because last value may be not populated now
--
--- avg_real = (a1 + a2 + ... + aN  - a1/N ) * ( N/(N-1) )
--  a1 + .. + aN = avg, so:
--  avg_real =  ( avg - a1/N ) * ( N/( N - 1 ) )
--  N = active_rows
--  a1 = data1:get(ns, 1) (Last value)
 
    a1 = data1:get(ns, 1)
    if a1 == nil then
--  nil means 'no messages' = 0 messages
      a1 = 0
    end
 
    N = active_rows
 
    avg_real = ( avg - (a1/active_rows) )*( N/(N-1) )
 
    a2=data1:get(prev_time(1, ns, data1),1)
--    a2 = 0
    current_percents = ( data1:get(prev_time(1, ns, data1), 1)*100 ) / avg_real
    delta_percents = math.abs(100 - current_percents )
 
    debug_message.Timestamp = ns
    debug_message.Payload =
       'AVG_messages: ' .. avg .. ' AVG_messages_real: ' ..  avg_real .. 'a2: ' .. a2 ..' active_rows:' .. active_rows ..
       ' current_percents:' .. current_percents .. ' delta_percents:' .. delta_percents .. ' critical_delta:' .. critical_delta .. ' is_alert:' ..  ' False \n'
 
 
    debug_message.Fields['deployment_mode'] = "ha_compact"
    debug_message.Fields['environment_label'] = "test2"
    debug_message.Fields['openstack_region'] = "RegionOne"
    debug_message.Fields['deployment_id'] = "3"
    debug_message.Fields['severity_label'] = "INFO"
    debug_message.Fields['programname'] = "hekad"
    debug_message.Fields['openstack_roles'] = "primary-controller"
    debug_message.Fields['openstack_release'] = "2015.1.0-7.0"
 
 
    inject_message(debug_message)
 
    influxdb_message.Timestamp = ns
    influxdb_message.Fields['payload_type'] = 'txt'
    influxdb_message.Fields['payload_name'] = 'influxdb'
-- Time is in ms
    influxdb_message.Payload = "AVG_messages_real,deployment_id=3,hostname=" .. hostname .. " value="..avg_real .." " .. math.floor(ns/1000000)  .. "\n"
 
    inject_message(influxdb_message)
 
 
      nagios_message.Timestamp = ns
      nagios_message.Fields['a2'] =  a2
      nagios_message.Fields['AVG_messages']       = avg
      nagios_message.Fields['AVG_messages_real']  = avg_real
      nagios_message.Fields['active_rows']      = active_rows
      nagios_message.Fields['current_percents'] = current_percents
      nagios_message.Fields['delta_percents']   = delta_percents
      nagios_message.Fields['critical_delta']   = critical_delta
      nagios_message.Fields['service']          = "heka-nodes.message-rate"
      nagios_message.Payload= "heka-nodes.message-rate"
 
      nagios_message.Fields['is_alert']         = 'False'
 
    if delta_percents > critical_delta then
      alert_message.Timestamp = ns
      alert_message.Fields['AVG_messages']       = avg
      alert_message.Fields['AVG_messages_real']  = avg_real
      alert_message.Fields['active_rows']      = active_rows
      alert_message.Fields['current_percents'] = current_percents
      alert_message.Fields['delta_percents']   = delta_percents
      alert_message.Fields['critical_delta']   = critical_delta
      alert_message.Fields['is_alert']         = 'True'
 
      nagios_message.Fields['is_alert']         = 'True'
      inject_message(alert_message)
    end
    inject_message(nagios_message)
  end
-- inject_message(msg)
--    return 0
end