Python Analysis Creation

Analysis I/O

An analysis module is a python script and has a general template. There is a class, which must be called the same name as the python script itself, and two class functions: __init__ and get_data. The module is first initialized and then get_data is called. This should return a pandas DataFrame or a NumSOS DataSet (preferably the former if you are using python3). Below are the variables passed from the Grafana interface to these class functions.

__init__
  • cont - A Sos.Container object which contains the path information to the SOS container specified in the Grafana query

  • start - The beginning of the time range of the Grafana query (in epoch time).

  • end - The end of the time range of the Grafana query (in epoch time).

  • schema - The LDMS schema specified by the Grafana query (e.g. meminfo).

  • maxDataPoints - the maximum number of points that Grafana can display on the user’s screen.

get_data
  • metrics - a python list of metrics specified by the Grafana query (e.g. [‘Active’,’MemFree’]).

  • job_id - a string of the job_id specified by the Grafana query.

  • user_name - a string of the user name specified by the Grafana query.

  • params - a string of the extra parameters specified by the Grafana query (e.g. ‘threshold = 10’).

  • filters - a python list of filter strings for the DSOS query (e.g. [‘job_id == 30’,’component_id < 604’]).

Example Analysis Module

Below is a basic analysis that simply queries the database and returns the DataFrame of the metrics passed in along with the timestamp, component_id, and job_id for each metric.

import os, sys, traceback
import datetime as dt
from graf_analysis.grafanaAnalysis import Analysis
from sosdb import Sos
import pandas as pd
import numpy as np
class dsosTemplate(Analysis):
    def __init__(self, cont, start, end, schema='job_id', maxDataPoints=4096):
        super().__init__(cont, start, end, schema, 1000000)

    def get_data(self, metrics, filters=[],params=None):
        try:
            sel = f'select {",".join(metrics)} from {self.schema}'
            where_clause = self.get_where(filters)
            order = 'time_job_comp'
            orderby='order_by ' + order
            self.query.select(f'{sel} {where_clause} {orderby}')
            res = self.get_all_data(self.query)
            # Fun stuff here!
            print(res.head)
            return res
        except Exception as e:
            a, b, c = sys.exc_info()
            print(str(e)+' '+str(c.tb_lineno))

In the __init__ function, most things are set to be self variables to access them later in the get_data using the super() function. The super() function also sets up a variable called self.query which is a Sos.SqlQuery object. The 1000000 in the super() function sets the block size for this self.query object. An optimal block size is dependent on the query, however 1 million has been sufficiently performant to this point.

In the get_data function we create a select clause for the DSOS query by joining the metrics and schema variables. The self.get_where is a graf_analysis class function which takes filter parameters and makes an SQL-like where clause string with self.start and self.end as timestamp boundaries. There is also the orderby variable which we are setting as time_job_comp here. This refers to what index we should use when querying the database. Our SOS databases are setup to use permutations of timestamp, job ID, and component ID as multi-indices. Depending on your filter, you may want to use a different multi-index.

The self.get_all_data takes the Sos.SqlQuery object, self.query, and calls self.query.next. This returns a block size number of records that match the query from database defined by the cont variable. If there are more than a block size number of records, it continues calling self.query.next and appending the results to a pandas DataFrame until all data is returned.

Additional analysis can be added where the “Fun stuff here!” comment is.

With the example parameters specified in the last section, our select statement here would be select Active,MemFree from meminfo where timestamp > start and timestamp < end and job_id == 30 and component_id < 604 order_by time_job_comp.

Note

job_id and user_name must exist in the schema passed in for this command to work.

Testing an Analysis Module

This section goes over how to test your python analysis module as a user.

You do not need to query from the Grafana interface to test your module. Below is a simple code which mimics the Grafana pipeline and prints the JSON returned to Grafana.

Note

If Grafana and SOS are already installed on your system then please skip the `Required Scripts`_ section and ask your system administrator where these scripts reside on the system so that you may copy all necessary python scripts and modules to your home directory, edit/modify exisiting python analysis modules and create new ones.

export PYTHONPATH=/usr/bin/python:/<INSTALL_PATH>/lib/python<PYTHON_VERSION>/site-packages/
export PATH=/usr/bin:/<INSTALL_PATH>/bin:/<INSTALL_PATH>/sbin::$PATH

Then you can imitate the Grafana query to call your analysis module using a python script such as:

#!/usr/bin/python3

import time,sys
from sosdb import Sos
from grafanaFormatter import DataFormatter
from table_formatter import table_formatter
from time_series_formatter import time_series_formatter
from dsosTemplate import dsosTemplate

sess = Sos.Session("/<DSOS_CONFIG_PATH>/config/dsos.conf")
cont = '<PATH_TO_DATABASE>'
cont = sess.open(cont)

model = dsosTemplate(cont, time.time()-300, time.time(), schema='meminfo', maxDataPoints=4096)

x = model.get_data(['Active'])

#fmt = table_formatter(x)
fmt = time_series_formatter(x)
x = fmt.ret_json()
print(x)
  • The model.get_data is where you can define the type of metrics to collect (in this case it is “Active”), what filters and extra parameters you want to add to your query. The syntax is as follows: (['<metric>'], filters=['job_id>0'], params='<variable>')

  • If you would like to query all metrics then replace Active with *.

  • To query a specific job_id: set job_id to you job_id with ==.

  • To query from a specific time range: update the start time, time.time()-300 and end time, time.time() to an epoch timestamp.

  • To add a string metric, filter or parameter, you must include a double quote, ", before and after the string (i.e. filters=['user=="myusername"'])

Note

The params can be any number or string that you want to define in your analysis module to better manage, output or analyze the data. For example, you can program your module to return specific analyses such as the average with params='analysis=average' by parsing the arguement, using if statements to determine what analysis to apply to the data and, to make things cleaner, a function to perform these calculations in.

Required Scripts

The following scripts are needed to run the python analysis module. If these python scripts or modules do not exist on your system and you have no way of accessing them then please continue. Otherwise, you can skip this section

If you do not have access to these existing scripts then please create them in the same directory as your python analysis module.

Note

If Grafana and SOS are installed on your system then please ask your system administator where these files reside on the system so that you can copy them to your home directory.

grafanaFormatter:

from sosdb import Sos
from sosdb.DataSet import DataSet
import numpy as np
import pandas as pd
import copy

class RowIter(object):
    def __init__(self, dataSet):
        self.dset = dataSet
        self.limit = dataSet.get_series_size()
        self.row_no = 0

    def __iter__(self):
        return self

    def cvt(self, value):
        if type(value) == np.datetime64:
            return [ value.astype(np.int64) / 1000 ]
        return value

    def __next__(self):
        if self.row_no >= self.limit:
            raise StopIteration
        res = [ self.cvt(self.dset[[col, self.row_no]]) for col in range(0, self.dset.series_count) ]
        self.row_no += 1
        return res

class DataFormatter(object):
    def __init__(self, data):
         self.result = []
         self.data = data
         self.fmt = type(self.data).__module__
         self.fmt_data = {
             'sosdb.DataSet' : self.fmt_dataset,
             'pandas.core.frame' : self.fmt_dataframe,
             'builtins' : self.fmt_builtins
         }

    def ret_json(self):
         return self.fmt_data[self.fmt]()

    def fmt_dataset(self):
        pass

    def fmt_dataframe(self):
        pass

    def fmt_builtins(self):
        pass

table_formatter:

from graf_analysis.grafanaFormatter import DataFormatter, RowIter
from sosdb.DataSet import DataSet
from sosdb import Sos
import numpy as np
import pandas as pd
import copy

class table_formatter(DataFormatter):
    def fmt_dataset(self):
        # Format data from sosdb DataSet object
        if self.data is None:
            return {"columns" : [{ "text" : "No papi jobs in time range" }] }

        self.result = { "type" : "table" }
        self.result["columns"] = [ { "text" : colName } for colName in self.data.series ]
        rows = []
        for row in RowIter(self.data):
            rows.append(row)
        self.result["rows"] = rows
        return self.result

    def fmt_dataframe(self):
        if self.data is None:
            return {"columns" : [{ "text" : "No papi jobs in time range" }] }

        self.result = { "type" : "table" }
        self.result["columns"] = [ { "text" : colName } for colName in self.data.columns ]
        self.result["rows"] = self.data.to_numpy()
        return self.result

    def fmt_builtins(self):
        if self.data is None:
            return { "columns" : [], "rows" : [], "type" : "table" }
        else:
            return self.data

time_series_formatter:

from graf_analysis.grafanaFormatter import DataFormatter
from sosdb.DataSet import DataSet
from sosdb import Sos
import numpy as np
import pandas as pd
import copy

class time_series_formatter(DataFormatter):
    def fmt_dataset(self):
        # timestamp is always last series
        if self.data is None:
            return [ { "target" : "", "datapoints" : [] } ]

        for series in self.data.series:
            if series == 'timestamp':
                continue
            ds = DataSet()
            ds.append_series(self.data, series_list=[series, 'timestamp'])
            plt_dict = { "target" : series }
            plt_dict['datapoints'] = ds.tolist()
            self.result.append(plt_dict)
            del ds
        return self.result

    def fmt_dataframe(self):
        if self.data is None:
            return [ { "target" : "", "datapoints" : [] } ]

        for series in self.data.columns:
            if series == 'timestamp':
                continue
            plt_dict = { "target" : series }
            plt_dict['datapoints'] = self.fmt_datapoints([series, 'timestamp'])
            self.result.append(plt_dict)
        return self.result

    def fmt_datapoints(self, series):
        ''' Format dataframe to output expected by grafana '''
        aSet = []
        for row_no in range(0, len(self.data)):
            aRow = []
            for col in series:
                v = self.data[col].values[row_no]
                typ = type(v)
                if typ.__module__ == 'builtins':
                    pass
                elif typ == np.ndarray or typ == np.string_ or typ == np.str_:
                    v = str(v)
                elif typ == np.float32 or typ == np.float64:
                    v = float(v)
                elif typ == np.int64 or typ == np.uint64:
                    v = int(v)
                elif typ == np.int32 or typ == np.uint32:
                    v = int(v)
                elif typ == np.int16 or typ == np.uint16:
                    v = int(v)
                elif typ == np.datetime64:
                    # convert to milliseconds from microseconds
                    v = v.astype(np.int64) / int(1e6)
                else:
                    raise ValueError("Unrecognized numpy type {0}".format(typ))
                aRow.append(v)
            aSet.append(aRow)
        return aSet

    def fmt_builtins(self):
        if self.data is None:
            return [ { "target" : "", "datapoints" : [] } ]
        else:
            return self.data