Decorators
Pipeline telemetry provides a number of decorators that help you to automatically create and store telemetry objects when calling a method on a class (instance). Currently 3 decorators are available:
add_single_usage_telemetry
add_mongo_single_usage_telemetry
add_telemetry
add_mongo_telemetry
add_telemetry
Decorator method add_telemetry adds a telemetry object to the class
instance when a method with this decorator is called. The add_telemetry
decorator checks for existing telemetry objects attached to the class instance.
If an existing telemetry object exists, no new telemetry object will be created.
This allows you to call other instance methods that also have an
add_telemetry decorator. This is helpfull when the datapipeline is
logically split into multiple sub pipelines that can also be used as stand
alone pipelines.
The telemetry object is closed after the method that created the object is finished.
Example calling a decorated method from another decoratod pipeline data method:
from pipeline_telemetry import add_mongo_single_usage_telemetry
from settings import TELEMETRY_PARAMS_1, TELEMETRY_PARAMS_2
class YourDataPipelineActivity:
@add_telemetry(TELEMETRY_PARAMS_1)
def run_data_pipeline_action(self):
# data pipeline logic and
# telemetry storage actions
self.run_data_pipeline_sub_activity()
@add_telemetry(TELEMETRY_PARAMS_2)
def data_pipeline_sub_activity(self):
# data pipeline logic and
# telemetry storage actions
pipeline = DataPipelineActivity()
pipeline.run_data_pipeline_action()
In this example a telemetry object is created based on TELEMETRY_PARAMS_1
settings when run_data_pipeline_action is called. In this method pipeline
logic is executed and telemetry is added to the telemetry object. Also another
pipeline process is started called data_pipeline_sub_activity. This method
has its own add_telemetry decorator where a telemetry object would be
created based on TELEMETRY_PARAMS_2 settings. However because the telemetry
object created by run_data_pipeline_action still exists no new telemetry
object will be created. All telemetry storage actions done by
data_pipeline_sub_activity will be stored in the telemetry object created
by run_data_pipeline_action.
This telemetry object will be closed once run_data_pipeline_action is
finished. This is done automatically by the add_telemetry.
Example directly calling the data_pipeline_sub_activity:
pipeline.data_pipeline_sub_activity()
When you directly call the data_pipeline_sub_activity a telemetry object will be created based upon TELEMETRY_PARAMS_2 settings and all telemetry data created in that method will go to that telemetry object.
add_mongo_telemetry
This decorator works exactly the same as the add_telemetry decorator. The
telemetry objects are however stored in a MongoDB instance
add_single_usage_telemetry
Decorator method add_single_usage_telemetry also adds a telemetry object to
the class instance when a method with this decorator is called. The telemetry
object is however renewed every time the decorated method is called (or
overwritten when another decorator is called). Hence the name
single_usage_telemetry.
When using this decorator you should not call other methods with telemetry decorators. Its use is aimed at methods that handle all the telemetry updates within the scope of a single data pipeline method.
Example:
from pipeline_telemetry import add_mongo_single_usage_telemetry
class DataPipelineActivity:
TELEMETRY_PARAMS = TELEMETRY_PARAMS_FOR_THIS_CLASS§
@add_single_usage_telemetry()
def run_data_pipeline_action(self):
# data pipeline logic and
# telemetry storage actions
pipeline = DataPipelineActivity()
pipeline.run_data_pipeline_action()
pipeline.run_data_pipeline_action()
In this example 2 telemetry objects are seperately created and stored in MongoDB. The telemetry objects can be enriched with telemetry from with the scope of the method. As soon as the method is finished the telemetry object will be stored and closed.
telemetry parameters for add_single_usage_telemetry
When using the add_single_usage_telemetry decorator you will have to define
the telemetry details at class level. This can be done in the following way:
class DataPipelineActivity:
TELEMETRY_PARAMS = TELEMETRY_PARAMS_FOR_THIS_CLASS
Where class attribute TELEMETRY_PARAMS is assigned to a valid set of
telemetry parameters. All methods decorated with add_single_usage_telemetry
type decorators will use class attribute TELEMETRY_PARAMS as the source for
telemetry parameters.
add_mongo_single_usage_telemetry
This decorator works exactly the same as the add_single_usage_telemetry
decorator. The telemetry objects are however stored in a MongoDB instance
Preset process_type counter
It is possible to add a sub_process counter to the telemetry object directly
from the single usage decorators. The sub_process must be predefined in the
ProcessType provided by TELEMETRY_PARAMS_FOR_THIS_ACTIVITY:
from pipeline_telemetry import \
add_mongo_single_usage_telemetry, ProcessType, Telemetry
from settings import TELEMETRY_PARAMS_FOR_THIS_CLASS
class DataPipelineActivity:
TELEMETRY_PARAMS = TELEMETRY_PARAMS_FOR_THIS_CLASS
@add_mongo_single_usage_telemetry(
sub_process='ADDRESS_FROM_COORDINATES')
def run_data_pipeline_action(self):
# data pipeline logic
# telemetry storage actions
DataPipelineActivity().run_data_pipeline_action()
In this exampe a telemetry object is stored in MongoDB with a sub_process ‘ADDRESS_FROM_COORDINATES’ with the counter set to 1. In the decorated method itself telemetry updates are allowed but no longer needed to ensure that the sub_process counter for the given sub_process is set. This setup is aimed a simple pipeline activities that need to only basis counting. You will only need to decorate the method that needs to create the telemetry object in order to start recording the telemetry data.