Telemetry mixin
The TelemetryMixin class is defined to allow for easy access to telemetry helper methods in a data pipeline class. The following methods are defined by the TelemetryMixin class.
process_errors_from_return_value
Method to process the errors from a ReturnValueWithStatus instance. It takes a ReturnValueWithStatus object and the sub_process to which any possible errors need to be added in the telemetry object:
from pipeline_telemetry import TelemetryMixin, add_telemetry
form settings import TELEMETRY_PARAMS
for my_pipeline import get_data_from_source
class MyDataPipeline(TelemetryMixin):
@add_telemetry(TELEMETRY_PARAMS) (1)
def process_data(self) -> None:
# get data in a ReturnValueWithStatus object
return_value = get_data_from_source() (2)
# process the errors
self.process_errors_from_return_value( (3)
return_value=return_value,
sub_process="GET_DATA_FROM_SOURCE"
)
# if return_value is invalid it makes no sense to process
if not return_value.is_valid: (4)
return
# process the items of the retrieved data
for record in return_value.result: (5)
self.process_record(record)
When process_data() is called the following steps are executed:
Telemetryobject is added to the instance ofMyDataPipelineThe actual data is retrieved. As this method is not aware of the
Telemetryobject it can not add any errors it might encounterErrors returned by get_data_from_source are added to the telemetry object
If the return_value object is not valid processing is skipped
Each seperate record is processed by instance method
process_record. As this method is aware of the telemetry object you might add addational telemetry data directly form that method.
process_telemetry_counters_from_return_value
Method to process the telemetry counters from a ReturnValueWithStatus instance. It takes a ReturnValueWithStatus object and processes all the telemetry counters that are found in the return_value’s result attribute. The method returns the result attribute without the telemetry counters.:
from pipeline_telemetry import TelemetryMixin, add_telemetry
form settings import TELEMETRY_PARAMS
for my_pipeline import get_data_from_source
class MyDataPipeline(TelemetryMixin):
@add_telemetry(TELEMETRY_PARAMS)
def process_data(self) -> None:
# get data in a ReturnValueWithStatus object
return_value = get_data_from_source()
# process the telemetry_counters
result_without_telemetry_counters = \ (1)
self.process_telemetry_counters_from_return_value(
return_value=return_value)
# process the items of the retrieved data
for record in result_without_telemetry_counters: (2)
self.process_record(record)
When process_data() is called the following steps are executed:
All telemetry counters returned in the return_value.result by
get_data_from_source()method are added to the telemetry attribute of theMyDataPipelineinstance.Any remaining data items can now be process by
process_recordmethod.
process_telemetry_counters_from_list
Method to process the telemetry counters from a list. The method will processes all the telemetry counters that are found in the provided list and return the list without the telemetry counters.:
from pipeline_telemetry import TelemetryMixin, add_telemetry
form settings import TELEMETRY_PARAMS
for my_pipeline import get_data_from_source
class MyDataPipeline(TelemetryMixin):
@add_telemetry(TELEMETRY_PARAMS)
def process_data(self) -> None:
# get data in a ReturnValueWithStatus object
list_with_source_data = get_data_from_source()
# process the telemetry_counters
list_without_telemetry_counters = \ (1)
self.process_telemetry_counters_from_list(
result_list=list_with_source_data)
# process the items of the retrieved data
for record in list_without_telemetry_counters: (2)
self.process_record(record)
When process_data() is called the following steps are executed:
All telemetry counters returned in the list_without_telemetry_counters by
get_data_from_source()method are added to the telemetry attribute of theMyDataPipelineinstance.Any remaining data items can now be process by
process_recordmethod.
set_telemetry_source_name
This method allows you to reset the telemetry source name. This method can help you in case the telemetry params source name should be set dynamically.:
class MyDataPipeline(TelemetryMixin):
@add_telemetry(TELEMETRY_PARAMS)
def process_data(self, source_name: str) -> None:
self.set_telemetry_source_name(source_name=source_name) (1)
# actual data pipeline logic
When process_data() with a source_name as argument this source_name will overide the TELEMETRY_PARAMS source name when set_telemetry_source_name is called with the source_name as argument (1).