Collectd

Материал из Wiki
Перейти к: навигация, поиск

Collectd

Collectd is a simple data collector which use plugins to collect data and output plugins to send data to another tool (heka is used in MOS LMA)
Collectd collects the following metrics:

Metrics

  • cpu (CPU usage)
  • df (disk usage/free size)
  • disk (disk usage/IOPS)
  • interface (interface usage/bytes sent and received )
  • load (Linux LA)
  • memory (memory usage)
  • processes (detailed monitoring of collect and hekad)
  • swap (swap usage)
  • openstack metrics (python plugin)
  • other metrics
  • your custom metrics if added

Plugins in collectd

Please see plugin details on collectd man page

Read

Collectd uses read plugins to read data from sources like files, /proc FS or services like apache nginx or haproxy.
In LMA there are 2 groups of read plugins:

* collectd default plugins. This group of plugins has been designed by collectd team and  collects the ’well-known' data metric  like CPU usage, I/O usage etc.

e.g, CPU plugin:

<LoadPlugin cpu>
  Globals false
</LoadPlugin>
  • custom plugins. This group of collectd plugins has been created in order to get openstack-related metrics, HAProxy metrics, RabbitMQ metrics , Elasticsearch and other metrics.

It can be plugins that was wrote on python or database plugins with customize queries. e.g. dbi plugins (dbi_cinder_*.conf) and python plugins from openstack.conf file.

Write

Collectd uses the write plugins for output.
Default LMA configuration involves the only one write plugin which sends data to Heka:

<LoadPlugin write_http>
  Globals false
</LoadPlugin>

<Plugin "write_http">
  <URL "http://127.0.0.1:8325">
    Format "JSON"
    StoreRates true
  </URL>
</Plugin>


The Hekad daemon is listening on 127.0.0.1:8325

# netstat -ntpl | grep 8325 tcp 0 0 127.0.0.1:8325 0.0.0.0:* LISTEN 15368/hekad

More details about Heka configuration you can find in Heka section (http://wiki.sirmax.noname.com.ua/index.php/Heka).
It is possible to add any write plugin you need. Write plugin set up for debugging is described below. One thing to note regarding the traffic from collectd to Heka is that collectd will batch many metrics in a single HTTP call to be more efficient

Other plugins

For logging there is log plugin.

<LoadPlugin logfile>
  Globals false
</LoadPlugin>

<Plugin logfile>
  LogLevel warning
  File "/var/log/collectd.log"
  Timestamp true
</Plugin>

Custom read Plugin

The simple python plugin is a plugin which was created for a better understanding of collectd python plugins.


  • plugin is controlled by 'control file'
  • value in control file enables/disables plugin.
  • plugin reads data from 'data file' only if value in 'control file' greater than zero otherwise it doing nothig.
  • data file, config file and resource name are configurable.
  • collectd notifications mechanism use only to demonstrate it's possibilities.

Plugin configuration

Add the following code to openstack.conf file:

    Import read_file
    <Module "read_file">
        DataFile = "/var/log/collectd_in_data"
        ConfigFile = "/var/log/collectd_in_data_config"
        DependsOnResource = "MyCustomTestResource"
    </Module>

Variables inside <Module> section can be read in plugin during run-time. In this example we pass to plugin 'data file', 'configuration file' and 'resource name'.

Plugin code

Plugin code is .py file, located in /usr/lib/collectd. In our case this is /usr/lib/collectd/read_file.py

Pay your attention - read_file.py is module name in configuration above so it is necessary to do import in collectd config: Import read_file

 
import collectd
import pprint
import json
 
 
class pluginTest():
 
  do_collect_data = False;
  depends_on_resource = ""
  config_file=""
  plugin_description='read_file_demo_plugin'
  data_file=""
 
 
  def configure_callback(self, conf):
    for c in conf.children:
      if c.key  == 'DataFile':
        self.data_file = c.values[0]
      elif c.key == 'DependsOnResource':
        self.depends_on_resource = c.values[0]
      elif c.key == 'ConfigFile':
        self.config_file = c.values[0]
      else:
        collectd.warning ('%s Unknown config key: %s.' % (self.plugin_description, c.key))
    collectd.error('%s : Configured with data_file=%s config_file=%s' % (self.plugin_description, self.data_file, self.config_file))
 
 
  def notification_callback(self, notification):
    try:
      data = json.loads(notification.message)
    except ValueError:
      return
    if 'value' not in data:
      collectd.warning ('%s : READ NOTIFICATION: missing value %s.' % ( self.plugin_description, self.__class__.__name__) )
    elif 'resource' not in data:
      collectd.warning ('%s : READ NOTIFICATION: missing resource %s.' % (self.plugin_description, self.__class__.__name__ ) )
    elif data['resource'] == self.depends_on_resource:
      do_collect_data = data['value'] > 0
      collectd.warning ("%s: %s: do_collect_data=%s" % (self.plugin_description, self.__class__.__name__, do_collect_data))
      self.do_collect_data = do_collect_data
 
 
  def read_callback(self):
    collectd.warning ('READ NOTIFICATION____VAR: %s.' % (self.do_collect_data ) )
    if self.do_collect_data % 2 == 0:
      collectd.warning ('READ NOTIFICATION____VAR___EVEN: %s.' % (self.do_collect_data ) )
    self.check_config()
    if self.do_collect_data:
      f_data_file = open(self.data_file,'r')
      value=f_data_file.readline()
      vl = collectd.Values(
            plugin=self.plugin_description,  # metric source
            plugin_instance='read_file_plugin_instance',
            type='gauge',
            type_instance='type_instance',
            # w/a for https://github.com/collectd/collectd/issues/716
            meta={'0': True},
            values=[value]
       )
      vl.dispatch()
      f_data_file.close()
 
  def check_config(self):
    f_config_file = open(self.config_file)
    config_value=f_config_file.readline()
    f_config_file.close()
    collectd.warning ("%s: config_value=%s" % (self.plugin_description, config_value))
    n = collectd.Notification()
    if int(config_value) > 0:
      n.dispatch(severity = 4, host = "TestNode", plugin = self.plugin_description, type = "gauge", type_instance = "read_file_plugin_test_instance", message = '{"resource":"'+str(self.depends_on_resource)+'","value":1}')
    else:
      n.dispatch(severity = 4, host = "TestNode", plugin = self.plugin_description, type = "gauge", type_instance = "read_file_plugin_test_instance", message = '{"resource":"'+str(self.depends_on_resource)+'","value":0}')
 
 
 
 
plugin = pluginTest()
 
collectd.register_notification(plugin.notification_callback)
collectd.register_config(plugin.configure_callback)
collectd.register_read(plugin.read_callback)

Code description

How does it work? This is a demo plugin, and all that it does can be implemented in a simpler way The main idea of this code is show how notifications mechanism can be used.

  • Python code loads once on collectd start. So, you need restart collectd if code was changed.
  • On load called function/method registered as config function
collectd.register_config(plugin.configure_callback)
  • This method reads config and initializes variables:
    • self.data_file - file with data (should be pre-created)
    • self.depends_on_resource - name of "resource". This is just variable for identify plugin instance. Also it is used in the filter notifications. (please see notification explanation section below for details)
    • self.config_file - config file, in test plugin only 2 values are possible to use - zero and any other positive number.


  • Each "interval" called method registered as
collectd.register_read(plugin.read_callback)

Interval is global collectd configuration parameter: collectd global options details

  • read function do the follwing:
    • writes to log self.do_collect_data (do we need collect data now?)
    • check if self.do_collect_data even or odd and if it is even write one more message. This part of code is just shows that we can operate with variables passed via notification mechanism and implement any logic we need
    • calls self.check_config() method (will be described below)
    • if self.do_collect_data is "True" read data from data file (please see config section) and publish it in collectd.


  • on each notification calles registered method:
collectd.register_notification(plugin.notification_callback)
  • This method do the following:
    • Loads message from notification if it possble: data = json.loads(notification.message)
    • If message contain resource we configured above, read data and change do_collect_data. Resource is used to filter messages we do not need. Any kind of logic can be implemented, e.g. we can read notification of other plugins.


  • Method check_config(self) calling on each read and do the following:
    • Read config file (only first line)
    • If value from config file is greater than zero (if int(config_value) > 0) it sends notification where resource is pre-configured DependsOnResource message = '{"resource":"'+str(self.depends_on_resource)+'","value":1}')
    • Otherwise send notification with "value":0


This notification is received by all plugins with register_notification configured include this plugin also
So we get this notification in notification_callback, and set self.do_collect_data True if value = 0 and false if Value > 0.
Now we are able to switch on or off data collecting in our plugin:

echo 0 > /var/log/collectd_in_data_config

in collectd.log it is possible to see what is going on

[2016-01-26 15:54:16] Notification: severity = OKAY, host = TestNode, plugin = read_file_demo_plugin, type = gauge, type_instance = read_file_plugin_test_instance, message = {"resource":"MyCustomTestResource","value":1}
[2016-01-26 15:54:16] read_file_demo_plugin: pluginTest: do_collect_data=True
[2016-01-26 15:54:26] READ NOTIFICATION____VAR: True.
[2016-01-26 15:54:26] read_file_demo_plugin: config_value=0


echo 99 > /var/log/collectd_in_data_config
[2016-01-26 15:57:26] Notification: severity = OKAY, host = TestNode, plugin = read_file_demo_plugin, type = gauge, type_instance = read_file_plugin_test_instance, message = {"resource":"MyCustomTestResource","value":1}
[2016-01-26 15:57:26] read_file_demo_plugin: pluginTest: do_collect_data=True
[2016-01-26 15:57:36] READ NOTIFICATION____VAR: True.
[2016-01-26 15:57:36] read_file_demo_plugin: config_value=99


It looks little bit tricky but it is just an example how we can add read plugin and use notification to send data between plugins.
You can find more details about python plugin in collectd man pages

Chain

The main idea of this part of colectd configuration is avoid data duplication from openstack-reated plugins.
Openstack-related plugins catches notifications and depends on value are able to read data or not to read.Example of such behavior was provided in read plugin example above.
So plugin pacemaker_resource is designed to notify openstack-related plugins are they running on 'master'. And 'Master' can be detected as controller node with

vip__management ( Management Virtual IP ) on it. 
<Chain "PostCache">
  <Rule>
    <Match "regex">
      Plugin "^pacemaker_resource$"
      TypeInstance "^vip__management$"
    </Match>
    <Target "notification">
      Message "{\"resource\":\"%{type_instance}\",\"value\":%{ds:value}}"
      Severity "OKAY"
    </Target>
  </Rule>
   Target "write"
</Chain>

This rule creates notifications and add record to log file. As you can see "value":1 so this is message from 'Master' controller.

[2016-01-20 14:00:54] Notification: severity = OKAY, host = node-6, plugin = pacemaker_resource, type = gauge, type_instance = vip__management, message = {"resource":"vip__management","value":1}


The idea of this PostCache rule is create notification ONLY if collectd Plugin matches "^pacemaker_resource$" AND TypeInstance mathes "^vip__management$" . Target creates notification with value -> :%{ds:value}. It means notification plugin takes data from collectd data record.
Here is one of such records:

collectd.Values(type='gauge',type_instance='vip__management',plugin='pacemaker_resource',host='node-6',time=1453826506.4363616,interval=10.0,values=[1.0],meta={'0': True})

And pacemaker_resource plugin is designed to identify is current node has resource.
Plugin pacemaker_resource by default has the follwing configuration:

<Module "pacemaker_resource">
    Resource "vip__management"
    Resource "vip__public"
    Resource "vip__vrouter"
    Resource "vip__vrouter_pub"
</Module>

In plugin code (/usr/lib/collectd/pacemaker_resource.py) you can find the following part:

        for resource in self.resources:
            out, err = self.execute([self.crm_resource_bin, '--locate',
                                     '--quiet', '--resource', resource],
                                    shell=False)
            if not out:
                self.logger.error("%s: Failed to get the status for '%s'" %
                                  (self.plugin, resource))
 
            else:
                value = 0
                if self.hostname == out.lstrip("\n"):
                    value = 1
                yield {
                    'type_instance': resource,
                    'values': value
                }

This code returns value = 1 if CRM resource is running on current host.
To check resource status, you can use /usr/sbin/crm_resource tool:

root@node-6:# /usr/sbin/crm_resource --locate --resource vip__management --quiet node-6.domain.tld

As you can see from output above, command runs on node-6 and returns node-6 so plugin returns value=1

As soon as notification was created, it can be read by any other plugins.
So Openstack-related plugins will read notification and 'understand' do they need to read data from OpenStack API.
As result, only one of many controllers in HA configuration will send data to centralized logging, but in case of fail 'master' controller crm resource will be moved to another node. On this node pacemaker_resource will detect status change and notify openstack-related plugins.

More details about collectd chains and filters.

Debug

There are some ways to debug data flow in collectd:

Debug http traffic

It is possible to debug data tranfering from collectd to hekad. e.g. you can use tcpflow or you favorite tool to dump http traffic
Run dumping tool:

  • heka is listen on port 8325, taken from write_http config
  • lo interface is loopback, heka is listen on 127.0.0.1, so it is easy to find interface
    • # ip ro get 127.0.0.1 local 127.0.0.1 dev lo src 127.0.0.1 cache <local>
    • dev lo is device you need.
  • # tcpflow -i lo port 8325
  • Example of output:
    # cat 127.000.000.001.45848-127.000.000.001.08325 | head -8 POST / HTTP/1.1 User-Agent: collectd/5.4.0.git Host: 127.0.0.1:8325 Accept: */* Content-Type: application/json Content-Length: 4064 [{"values":[2160],"dstypes":["gauge"],"dsnames":["value"],"time":1453203196.259,"interval":10.000,"host":"node-7","plugin":"processes","plugin_instance":"collectd","type":"ps_stacksize","type_instance":""},{"values":[0,1999.74],"dstypes":["derive","derive"],"dsnames": ... skip ...


  • Example of output custom read_file plugin described above formatted with json.tool python lib.


[
    {
        "dsnames": [
            "value"
        ],
        "dstypes": [
            "gauge"
        ],
        "host": "node-6",
        "interval": 10.0,
        "meta": {
            "0": true
        },
        "plugin": "read_file_demo_plugin",
        "plugin_instance": "read_file_plugin_instance",
        "time": 1453911212.246,
        "type": "gauge",
        "type_instance": "type_instance",
        "values": [
            889000000.0
        ]
    }
]

Debug with your own write plugin

One more way to debug is create your own write plugin and write all you need.
For example I created simple write plugin (using python)

  • Create plugin configuration, e.g. in /etc/collectd/conf.d/openstack.conf
    Import write_file
    <Module "write_file">
        log_filename = "/var/log/collectd_debug.log"
    </Module>

Create file /usr/lib/collectd/write_file.py (depends on your ModulePath, by-default it is "/usr/lib/collectd")

 
import collectd
 
 
def configure_callback(conf):
  global f_log_file
 
  for c in conf.children:
    if c.key  == 'log_filename':
      log_filename = c.values[0]
    else:
      collectd.warning ('log_file_info plugin: Unknown config key: %s.' % c.key)
  collectd.error('Configured with log_filename=%s' % (log_filename))
  f_log_file = open(log_filename,'w')
 
 
 
def write_callback(vl, data=None):
  for i in vl.values:
    #collectd.error("write_file: %s (%s): %f" % (vl.plugin, vl.type, i))
    f_log_file.write("plugin=%s plugin_instance=%s  type_instance=%s type=%s value=%s \n" % (vl.plugin, vl.plugin_instance, vl.type_instance, vl.type, i))
 
collectd.register_config(configure_callback)
collectd.register_write(write_callback)


  • Example of output read_file custom plugin:
cat /var/log/collectd_debug.log | grep read_file | head -10 collectd.Values(type='gauge',type_instance='type_instance',plugin='read_file_demo_plugin',plugin_instance='read_file_plugin_instance',host='node-6',time=1453910962.4182441,interval=10.0,values=[888999888.0],meta={'0': True}) read_file_demo_plugin (gauge): 888999888.000000 collectd.Values(type='gauge',type_instance='type_instance',plugin='read_file_demo_plugin',plugin_instance='read_file_plugin_instance',host='node-6',time=1453910972.2508557,interval=10.0,values=[888999888.0],meta={'0': True}) read_file_demo_plugin (gauge): 888999888.000000 collectd.Values(type='gauge',type_instance='type_instance',plugin='read_file_demo_plugin',plugin_instance='read_file_plugin_instance',host='node-6',time=1453910982.2580974,interval=10.0,values=[888999888.0],meta={'0': True}) read_file_demo_plugin (gauge): 888999888.000000 collectd.Values(type='gauge',type_instance='type_instance',plugin='read_file_demo_plugin',plugin_instance='read_file_plugin_instance',host='node-6',time=1453910992.2493923,interval=10.0,values=[888999888.0],meta={'0': True}) read_file_demo_plugin (gauge): 888999888.000000 collectd.Values(type='gauge',type_instance='type_instance',plugin='read_file_demo_plugin',plugin_instance='read_file_plugin_instance',host='node-6',time=1453911002.2467947,interval=10.0,values=[888999888.0],meta={'0': True}) read_file_demo_plugin (gauge): 888999888.000000

Debug with unixsock plugin

One more way to get some debug information is using collectd-unixsock.

add config, restart collectd

# cat 98-unixsock.conf
<LoadPlugin  unixsock>
  Globals false
</LoadPlugin>

<Plugin unixsock>
  SocketFile "/var/run/collectd-unixsock"
  SocketGroup "collectd"
  SocketPerms "0770"
  DeleteSocket true
</Plugin>


#collectdctl listval node-6/apache-localhost/apache_bytes node-6/apache-localhost/apache_connections node-6/apache-localhost/apache_idle_workers node-6/apache-localhost/apache_requests node-6/apache-localhost/apache_scoreboard-closing node-6/apache-localhost/apache_scoreboard-dnslookup node-6/apache-localhost/apache_scoreboard-finishing node-6/apache-localhost/apache_scoreboard-idle_cleanup node-6/apache-localhost/apache_scoreboard-keepalive node-6/apache-localhost/apache_scoreboard-logging node-6/apache-localhost/apache_scoreboard-open node-6/apache-localhost/apache_scoreboard-reading node-6/apache-localhost/apache_scoreboard-sending node-6/apache-localhost/apache_scoreboard-starting node-6/apache-localhost/apache_scoreboard-waiting node-6/check_openstack_api-cinder-api/gauge-RegionOne ... Skip ...


# collectdctl getval node-6/swap/swap-free value=1.923355e+09


  • Example of output read_file plugin:
    • Get name of metric:
# collectdctl listval | grep read_file node-6/read_file_demo_plugin-read_file_plugin_instance/gauge-type_instance
    • Get data:
# collectdctl getval node-6/read_file_demo_plugin-read_file_plugin_instance/gauge-type_instance value=8.889999e+08

Data in Heka

In current LMA configuration collectd sends data to heka. So we can find data in heka debug logs. More details about heka configuration ad debugging you can find in Heka chapter, here just short explanation:

  • Add to Heka configuration debug:
[RstEncoder]

[output_file]
type = "FileOutput"
message_matcher = "Fields[aggregator] == NIL"
path = "/var/log/heka-debug.log"
perm = "666"
flush_count = 100
flush_operator = "OR"
encoder = "RstEncoder"

In this config file RstEncoder and FileOutput are configured for write all data into debug log


cat /var/log/heka-debug.log | grep read_file | tail -10 :Payload: {"type":"gauge","values":[889000000],"type_instance":"type_instance","meta":{"0":true},"dsnames":["value"],"plugin":"read_file_demo_plugin","time":1453974100.479,"interval":10,"host":"node-6","dstypes":["gauge"],"plugin_instance":"read_file_plugin_instance"} | name:"source" type:string value:"read_file_demo_plugin" :Payload: {"type":"gauge","values":[889000000],"type_instance":"type_instance","meta":{"0":true},"dsnames":["value"],"plugin":"read_file_demo_plugin","time":1453974110.474,"interval":10,"host":"node-6","dstypes":["gauge"],"plugin_instance":"read_file_plugin_instance"} | name:"source" type:string value:"read_file_demo_plugin" :Payload: {"type":"gauge","values":[889000000],"type_instance":"type_instance","meta":{"0":true},"dsnames":["value"],"plugin":"read_file_demo_plugin","time":1453974120.484,"interval":10,"host":"node-6","dstypes":["gauge"],"plugin_instance":"read_file_plugin_instance"} | name:"source" type:string value:"read_file_demo_plugin" :Payload: {"type":"gauge","values":[889000000],"type_instance":"type_instance","meta":{"0":true},"dsnames":["value"],"plugin":"read_file_demo_plugin","time":1453974130.477,"interval":10,"host":"node-6","dstypes":["gauge"],"plugin_instance":"read_file_plugin_instance"} | name:"source" type:string value:"read_file_demo_plugin" :Payload: {"type":"gauge","values":[889000000],"type_instance":"type_instance","meta":{"0":true},"dsnames":["value"],"plugin":"read_file_demo_plugin","time":1453974140.472,"interval":10,"host":"node-6","dstypes":["gauge"],"plugin_instance":"read_file_plugin_instance"} | name:"source" type:string value:"read_file_demo_plugin"