Process types
A process type is a mandatory attribute of a Telemetry object. The process type object defines the name of the data pipeline process as well as all subprocesses for which telemetry data can be recorded. In the example below a process is defined for retrieving weather data.
ProcessType(process_type = 'GET_WEATHER_DATA',
subtypes = [
'RETRIEVE_WEATHER_OBJECT_FROM_API',
'CONVERT_TO_FORECAST',
'STORE_FORECAST']
)
The GET_WEATHER_DATA process consists of 3 subprocesses, respectively for retrieving, converting and storing weather forecast data. Each of these subprocesses are allowed to have their own telemetry details.
This setup with process types foreces a unified data model for telemetry reporting in a specific datapipeline.
Available ProcessTypes
Out of the box pipeline_telemetry defines a few process types which can be used to store telemetry for datapipelines.
CREATE_DATA_FROM_URL, CREATE_DATA_FROM_API and CREATE_DATA_FROM_FILE
Process types that are aimed at retrieving data from web pages, api’s or files.
They all allow for the following subtypes
Sub process type |
Description |
|---|---|
RETRIEVE_RAW_DATA |
Sub process to retrieve the data from its source |
DATA_CONVERSION |
Sub process to convert the data from to a form in which it can be stored |
DATA_STORAGE |
Sub process to store the data that was retrieved |
UPLOAD_DATA
A Process type aimed at uploading (a selection of) data and uploading it to an
external environment. This process_type allows for the following subtypes
Sub process type |
Description |
|---|---|
DATA_SELECTION |
Sub process for selecting a set of data to be uploaded |
DATA_CONVERSION |
Sub process to convert the data to a form in which it can be uploaded |
DATA_UPLOAD |
Sub process to upload the data to a specific target |
Creating your own process types and subtypes
The package allows for custom process types and subtypes to be defined. This can easily be done with the ProcessType class. Be aware that your custom process types need to be registered with the Telemetry class before they can be used. In the example below the process type CUSTOM_GET_WEATHER_DATA is defined with 5 sub process types.:
from pipeline_telemetry import ProcessType, Telemetry
GET_WEATHER_DATA = ProcessType(
process_type = 'CUSTOM_GET_WEATHER_DATA',
subtypes = [
'RETRIEVE_WEATHER_OBJECT_FROM_API',
'CONVERT_TO_FORECAST',
'STORE_FORECAST',
'CONVERT_TO_CURRENT_DAY_ACTUALS',
'STORE_ACTUALS']
)
Registering process types
Custom process types need to be registered before they can be used in a telemetry object. This can be done the add_process_type class method on Telmetry class.:
from pipeline_telemetry import ProcessType, Telemetry
CUSTOM_PROCESS = ProcessType(
process_type = 'CUSTOM_PROCESS',
subtypes = [
'SUB_PROCESS_1',
'SUB_PROCESS_2']
)
Telemetry.add_process_type(
process_type_key='CUSTOM_PROCESS',
process_type=CUSTOM_PROCESS
)
If multiple custom process types are defined you can register them in bulk using the Singleton class ProcessTypes. In order to do you will need to define the process types in a child class of BaseEnumerator and call the register_process_types class method on ProcessTypes with the BaseEnumerator child class as argument.:
from pipeline_telemetry import \
BaseEnumerator, ProcessType, ProcessTypes
class WeatherDataProcessTypes(BaseEnumerator):
"""
Class to define the process types with their subtypes for Weather data
pipelines
"""
GET_WEATHER_DATA = ProcessType(
process_type = 'CUSTOM_GET_WEATHER_DATA',
subtypes = [
'RETRIEVE_WEATHER_OBJECT_FROM_API',
'CONVERT_TO_FORECAST',
'STORE_FORECAST',
'CONVERT_TO_CURRENT_DAY_ACTUALS',
'STORE_ACTUALS']
)
GET_CLIMATE_DATA = ProcessType(
process_type = 'CUSTOM_GET_CLIMATE_DATA',
subtypes = [
'RETRIEVE_CLIMATE_OBJECT_FROM_API',
'CONVERT_TO_YEARLY_CLIMATE_OBJECT',
'STORE_YEARLY_CLIMATE']
)
ProcessTypes.register_process_types(WeatherDataProcessTypes)
Once the register_process_types class method has been called on
ProcessTypes all process_types defined in WeatherDataProcessTypes will be availale via ProcessTypes class as this examples shows.
>>> from pipeline_telemetry import ProcessTypes
>>> ProcessTypes.GET_CLIMATE_DATA
ProcessType(process_type='CUSTOM_GET_CLIMATE_DATA', subtypes=['RETRIEVE_CLIMATE_OBJECT_FROM_API', 'CONVERT_TO_YEARLY_CLIMATE_OBJECT', 'STORE_YEARLY_CLIMATE'])
After registration GET_CLIMATE_DATA ProcessType can be used when creating
Telemetry objects:
from pipeline_telemetry import ProcessType, Telemetry
TELEMETRY_LOAD_CLIMATE_DATA = {
'category': 'CLIMATE',
'sub_category': 'MONTHLY_CLIMATE_DATA',
'source_name': 'SOME_WEATHER_API',
'process_type': ProcessTypes.GET_CLIMATE_DATA,
'telemetry_rules': {}
}
telemetry_obj = Telemetry(**TELEMETRY_LOAD_CLIMATE_DATA)
You can now add telemetry to this telemetry object using subprocess, ‘RETRIEVE_CLIMATE_OBJECT_FROM_API’, ‘CONVERT_TO_YEARLY_CLIMATE_OBJECT’ and ‘STORE_YEARLY_CLIMATE’.
Registering process types using a meta class
ProcessTypes register methods are dynamically evaluated. Therefore linters, codecompletion and typechecker will not recognize your custom process types and:
ProcessTypes.YOUR_PROCESS_TYPE
will not pass you’re type checker and linter.
You can solve this by defining you custom ProcessTypes class based upon the ProcessTypesMeta class and mixing in any class with ProcessType class attributes:
from pipeline_telemetry import ProcessTypesMeta
class ProcessTypesSet1():
CUSTOM_PROCESS = ProcessType(
process_type = 'CUSTOM_PROCESS',
subtypes = [
'SUB_PROCESS_1',
'SUB_PROCESS_2']
)
class ProcessTypesSet2():
OTHER_CUSTOM_PROCESS = ProcessType(
process_type = 'OTHER_CUSTOM_PROCESS',
subtypes = [
'SUB_PROCESS_1',
'SUB_PROCESS_2']
)
cls MyProcessTypes(ProcessTypesSet1, ProcessTypesSet2, metaclass=ProcessTypesMeta): ...
The MyProcessTypes class now acts as a ProcessTypes class (it is actually a subclass of ProcessTypes) with CUSTOM_PROCESS and OTHER_CUSTOM_PROCESS as ProcessType attributes.:
MyProcessTypes.CUSTOM_PROCESS
MyProcessTypes.OTHER_CUSTOM_PROCESS
Now pass your linting and type checker and will autocomplete in your IDE.