Write and Run a Pipeline¶
Generally you would use an AutoML system to find pipelines, but it is useful to know how to write a pipeline by yourself, too. Moreover, once you have a pipeline you might want to explore how it works by running it.
Pipeline¶
A pipeline is described as a DAG consisting of interconnected steps, where steps can be primitives, or (nested) other (sub)pipelines. A pipeline has data-flow semantics, which means that steps are not necessary executed in the order they are listed, but a step can be executed when all its inputs are available. Some steps can even be executed in parallel. On the other hand, each step can use only previously defined outputs from steps coming before them in the list of steps.
Note
The reference runtime in the core package runs pipeline steps in order in which they are listed in the pipeline. It expects the steps to be in the order with all step inputs always available from prior steps or pipeline inputs.
Pipelines have multiple representations. The core package supports pipelines as in-memory objects, JSON, and YAML. In JSON, the following is a sketch of a pipeline description:
{
"id": <UUID of the pipeline>,
# A URI representing a schema and version to which pipeline description conforms.
"schema": "https://metadata.datadrivendiscovery.org/schemas/v0/pipeline.json",
# Digest is generally computed automatically when saving a pipeline.
"digest": <digest of the pipeline>,
"created": <timestamp when created, in ISO 8601>,
"name": <human friendly name of the pipeline, if it exists>,
"description": <human friendly description of the pipeline, if it exists>,
# A list of inputs the pipeline takes.
"inputs": [
{
"name": <human friendly name of the inputs>
}
],
# A list of outputs the pipeline produces.
"outputs": [
{
"name": <human friendly name of the outputs>,
"data": <data reference, probably of an output of a step>
}
],
"steps": [
{
"type": "PRIMITIVE",
"primitive": {
"id": <ID of the primitive used in this step>,
"version": <version of the primitive used in this step>,
"python_path": <Python path of this primitive>,
"name": <human friendly name of this primitive>,
"digest": <digest of this primitive>
},
# Arguments are inputs to the step as a whole which are then
# passed as necessary to primitive's methods when called.
"arguments": {
"inputs": {
# Type defines how data is passed. CONTAINER means that it is
# passed as (container) value itself, which is the most common way.
"type": "CONTAINER",
"data": <data reference, probably of an output of a step or pipeline input>
},
# Despite misleading name, this is in fact the standard name of an input to the primitive,
# e.g., representing a target column with known values for training.
"outputs": {
"type": "CONTAINER",
"data": <data reference, probably of an output of a step or pipeline input>
}
},
"outputs": [
{
# Output data is made available by this step from default "produce" method.
"id": "produce"
},
{
# Output data is made available by this step from an extra
# produce method named "produce_score", too.
"id": "produce_score"
}
],
# Hyper-parameters are initialization time parameters of the primitive.
# Every primitive defines its own set of hyper-parameters it accepts,
# the hyper-parameter class implementing the logic of a hyper-parameter,
# and possible values the hyper-parameter can have.
"hyperparams": {
"column_to_operate_on": {
# VALUE means a constant value. "data" is in this
# case the value itself and not a reference.
"type": "VALUE",
# Value is converted to a JSON-compatible value by hyper-parameter class.
# It also knows how to convert it back. For simple values, no conversion might happen.
"data": 5
}
}
},
... more steps ...
]
}
Note
This sketch is not valid JSON because it contains comments and placeholders. It makes no logical sense either.
Here we have shown just a subset of possible standard fields.
Moreover, we used only the CONTAINER
data type, while there are also others data types.
Similarly, there are other step types, too.
To learn more read the guide on advanced pipelines
or consult the pipeline JSON schema itself.
Pipeline describes how inputs are computed into outputs. For standard pipelines,
the input is a Dataset
container value and
the output is a Pandas DataFrame
container
value with predictions in standard predictions
structure.
The same pipeline is used for both fitting on train data and producing on test data.
Note
Pipelines are defined very generally: number and meaning of pipeline inputs and outputs can be arbitrary, even the execution semantic of the pipeline can be redefined. For our purposes we focus on standard pipelines, for which we use execution semantics of the reference runtime.
Primitive steps describe how to run a primitive for that step and map step inputs to primitive arguments and hyper-parameters and step outputs to primitive produce methods. Primitives allow reuse and compositionality of logic, but the downside is that then all logic has to be in primitives and those are slightly tedious to write. Using primitives helps with reproducibility but brings overhead for adding new logic into a pipeline, if this logic does not already exists as a primitive.
Each primitive has a set of arguments it takes as a whole, combining all the arguments from all its methods. Each argument (identified by its name) can have only one value associated with it and any method accepting that argument receives that value. Once all values for all arguments for a method are available, that method can be called.
Each primitive can have multiple produce methods. These methods can be called after a primitive has been fitted. In this way a primitive can have multiple outputs, for each produce method one.
Note
There are also other step types possible, e.g., sub-pipelines and placeholders.
Hyper-parameters¶
Hyper-parameters are initialization time parameters of the primitive. All hyper-parameters from all primitives together form hyper-parameters of the pipeline.
Some hyper-parameters can be provided when pipeline is run and can be different between different runs of the same pipeline,
but for some hyper-parameters that makes no sense because changing them would also change the logic of the pipeline.
We call the latter control hyper-parameters while the former are generally tunable hyper-parameters.
Control hyper-parameters should generally be fixed as part of the pipeline definition, leaving other hyper-parameters
to be potentially provided when pipeline is run (otherwise default values are used for them).
column_to_operate_on
is an example of a control hyper-parameter in the pipeline sketch above.
There are also other types of hyper-parameters (e.g., to control resource usage) and values to hyper-parameters can be passed as different data types, too. Moreover, every hyper-parameter belongs to a hyper-parameter class implementing its logic. Read hyper-parameters guide for details.
Data References¶
Pipeline descriptions contains data references. A data reference is just a string which identifies an output of a prior step or a pipeline input. A data reference describes a data-flow connection between data available and an input to a step. It is recommended to be a string of the following forms:
steps.<number>.<id>
—number
identifies the step in the list of steps (0-based) andid
identifies the name of a produce method of the primitiveinputs.<number>
—number
identifies the pipeline input (0-based)outputs.<number>
—number
identifies the pipeline output (0-based)
Pipeline Description Example¶
Note
The example assumes core package and basic primitives installed.
The following example uses Pipeline
class
to make an in-memory pipeline. This
specific example creates a pipeline for classification.
from d3m import index
from d3m.metadata.base import ArgumentType
from d3m.metadata.pipeline import Pipeline, PrimitiveStep
# Creating pipeline
pipeline_description = Pipeline()
pipeline_description.add_input(name='inputs')
# Step 1: dataset_to_dataframe
# An input to a standard pipeline is a Dataset. Here we assume it contains
# only one resource and that it is a DataFrame we extract out.
step_0 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.dataset_to_dataframe.Common'))
step_0.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='inputs.0')
step_0.add_output('produce')
pipeline_description.add_step(step_0)
# Step 2: column_parser
# All columns in DataFrames inside a Dataset are loaded as string columns. This is to
# assure that primitives control how columns are parsed and not logic outside of a pipeline.
# So we parse columns now, based on their types available in metadata.
step_1 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.column_parser.Common'))
step_1.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='steps.0.produce')
step_1.add_output('produce')
pipeline_description.add_step(step_1)
# Step 3: extract_columns_by_semantic_types(attributes)
# Metadata contains also semantic types which can represent column roles.
# Here we extract only attribute columns.
step_2 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common'))
step_2.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='steps.1.produce')
step_2.add_output('produce')
step_2.add_hyperparameter(
name='semantic_types',
argument_type=ArgumentType.VALUE,
data=['https://metadata.datadrivendiscovery.org/types/Attribute'],
)
pipeline_description.add_step(step_2)
# Step 4: extract_columns_by_semantic_types(targets)
# Here we extract only target columns.
step_3 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common'))
step_3.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='steps.0.produce')
step_3.add_output('produce')
step_3.add_hyperparameter(
name='semantic_types',
argument_type=ArgumentType.VALUE,
data=['https://metadata.datadrivendiscovery.org/types/TrueTarget'],
)
pipeline_description.add_step(step_3)
# Step 5: imputer
# We impute attribute columns.
step_4 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_cleaning.imputer.SKlearn'))
step_4.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='steps.2.produce')
step_4.add_output('produce')
pipeline_description.add_step(step_4)
# Step 6: random_forest
# And train a random forest on attribute and target columns.
step_5 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.regression.random_forest.SKlearn'))
step_5.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='steps.4.produce')
step_5.add_argument(name='outputs', argument_type=ArgumentType.CONTAINER, data='steps.3.produce')
step_5.add_output('produce')
pipeline_description.add_step(step_5)
# Step 7: construct_predictions
# This is a primitive which assures that the output of a standard pipeline has predictions
# in the correct structure (e.g., there is also a d3mIndex column with index for every row).
step_6 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.construct_predictions.Common'))
step_6.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data='steps.5.produce')
# This is a primitive which uses a non-standard second argument, named "reference".
step_6.add_argument(name='reference', argument_type=ArgumentType.CONTAINER, data='steps.0.produce')
step_6.add_output('produce')
pipeline_description.add_step(step_6)
# Final output
pipeline_description.add_output(name='output predictions', data_reference='steps.6.produce')
# Output to YAML
print(pipeline_description.to_yaml())
As you can see, building a pipeline by hand is pretty tedious and requires one to use correct data references. Ideally, you would be using other tools (e.g., an AutoML system) to build a pipeline for you. Those tools can do use this API internally.
Values passed around in a D3M pipeline contain also metadata and that part of that metadata are also semantic types which can provide information about columns like their role. A later guide explains this in more detail.
Note
Some primitives support determining on which columns to operate automatically based on semantic types. This includes all sklearn-wrap primitives, too, so the example pipeline above could be simplified to not explicitly extract columns by roles.
YAML representation of this pipeline looks like:
created: '2021-02-25T21:04:43.399478Z'
digest: d9a06fbd2ba3f7771e703a6a3e455379c692cc4291904d44f86db07f3a5210f2
id: e70b61e2-6fcd-470b-becc-d3eba7041ab8
inputs:
- name: inputs
outputs:
- data: steps.6.produce
name: output predictions
schema: https://metadata.datadrivendiscovery.org/schemas/v0/pipeline.json
steps:
- arguments:
inputs:
data: inputs.0
type: CONTAINER
outputs:
- id: produce
primitive:
digest: aed657e5effa3e313bd0e59c7334100aa8552fc5aba762a959ce4569284a5e63
id: 4b42ce1e-9b98-4a25-b68e-fad13311eb65
name: Extract a DataFrame from a Dataset
python_path: d3m.primitives.data_transformation.dataset_to_dataframe.Common
version: 0.3.0
type: PRIMITIVE
- arguments:
inputs:
data: steps.0.produce
type: CONTAINER
outputs:
- id: produce
primitive:
digest: 6f73dc863e2cfcbed90757ab26c34ca8df23e24f9a26632f48dc228f2277dc7b
id: d510cb7a-1782-4f51-b44c-58f0236e47c7
name: Parses strings into their types
python_path: d3m.primitives.data_transformation.column_parser.Common
version: 0.6.0
type: PRIMITIVE
- arguments:
inputs:
data: steps.1.produce
type: CONTAINER
hyperparams:
semantic_types:
data:
- https://metadata.datadrivendiscovery.org/types/Attribute
type: VALUE
outputs:
- id: produce
primitive:
digest: 88f0780f5324d4a881d5d51e29f33fdcdc6d2968acf3b927032cf2d832e10504
id: 4503a4c6-42f7-45a1-a1d4-ed69699cf5e1
name: Extracts columns by semantic type
python_path: d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common
version: 0.4.0
type: PRIMITIVE
- arguments:
inputs:
data: steps.0.produce
type: CONTAINER
hyperparams:
semantic_types:
data:
- https://metadata.datadrivendiscovery.org/types/TrueTarget
type: VALUE
outputs:
- id: produce
primitive:
digest: 88f0780f5324d4a881d5d51e29f33fdcdc6d2968acf3b927032cf2d832e10504
id: 4503a4c6-42f7-45a1-a1d4-ed69699cf5e1
name: Extracts columns by semantic type
python_path: d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common
version: 0.4.0
type: PRIMITIVE
- arguments:
inputs:
data: steps.2.produce
type: CONTAINER
outputs:
- id: produce
primitive:
digest: 84bf94c87a745011023da7074c65e1cee1272843d5a11cce1c64c7f20d42e408
id: d016df89-de62-3c53-87ed-c06bb6a23cde
name: sklearn.impute.SimpleImputer
python_path: d3m.primitives.data_cleaning.imputer.SKlearn
version: 2020.12.1
type: PRIMITIVE
- arguments:
inputs:
data: steps.4.produce
type: CONTAINER
outputs:
data: steps.3.produce
type: CONTAINER
outputs:
- id: produce
primitive:
digest: 79111615e8d956499bd1c2a8ee16575379da4e666861979ef293ba408f417549
id: f0fd7a62-09b5-3abc-93bb-f5f999f7cc80
name: sklearn.ensemble.forest.RandomForestRegressor
python_path: d3m.primitives.regression.random_forest.SKlearn
version: 2020.12.1
type: PRIMITIVE
- arguments:
inputs:
data: steps.5.produce
type: CONTAINER
reference:
data: steps.0.produce
type: CONTAINER
outputs:
- id: produce
primitive:
digest: 7ecceddd6bf78f4a8b0719f1aff46fe2e549c0b4b096be035513a92bdb6510de
id: 8d38b340-f83f-4877-baaa-162f8e551736
name: Construct pipeline predictions output
python_path: d3m.primitives.data_transformation.construct_predictions.Common
version: 0.3.0
type: PRIMITIVE
The core package populated more information about primitives used and computed digests. Because pipeline ID was not provided, it was auto-generated, too. If you prefer, you can write pipelines in YAML or JSON directly, too.
Note
Digest values will be different for you if you run the code above because you will probably have a different
version of primitives installed and at least a different pipeline’s created
timestamp.
Reference Runtime¶
d3m.runtime
module contains a reference runtime for pipelines. There
is also an extensive command line interface (CLI) you can access
through python3 -m d3m runtime
.
The reference runtime runs the pipeline twice, in two phases, first fitting the pipeline and then producing. During fitting each primitive is first fitted and then produced on train data, in in steps order. During producing, each primitive is produced on test data. Before each phase, the reference runtime sets target column role semantic type on target column(s) based on the provided problem description. This is the way how information from the problem description is passed to the pipeline and primitives.
Note
We choose to use term producing and not predicting because producing encompass both predicting and transforming.
d3m.runtime
module exposes both a low-level Runtime
class and high-level functions like fit
and produce
.
We can use those high-level functions with the pipeline we made above and example dataset:
import sys
from d3m import runtime
from d3m.container import dataset
from d3m.metadata import base as metadata_base, pipeline, problem
# Loading problem description.
problem_description = problem.get_problem('datasets/training_datasets/seed_datasets_archive/185_baseball/185_baseball_problem/problemDoc.json')
# Loading train and test datasets.
train_dataset = dataset.get_dataset('datasets/training_datasets/seed_datasets_archive/185_baseball/TRAIN/dataset_TRAIN/datasetDoc.json')
test_dataset = dataset.get_dataset('datasets/training_datasets/seed_datasets_archive/185_baseball/TEST/dataset_TEST/datasetDoc.json')
# Loading pipeline description from the YAML representation.
# We could also just use the in-memory object we made above.
pipeline_description = pipeline.get_pipeline('pipeline.yaml')
# Fitting pipeline on train dataset.
fitted_pipeline, train_predictions, fit_result = runtime.fit(
pipeline_description,
[train_dataset],
problem_description=problem_description,
context=metadata_base.Context.TESTING,
)
# Any errors from running the pipeline are captured and stored in
# the result objects (together with any values produced until then and
# pipeline run information). Here we just want to know if it succeed.
fit_result.check_success()
# Producing predictions using the fitted pipeline on test dataset.
test_predictions, produce_result = runtime.produce(
fitted_pipeline,
[test_dataset],
)
produce_result.check_success()
test_predictions.to_csv(sys.stdout)
To do the same using CLI, you can run:
$ python3 -m d3m runtime fit-produce \
--pipeline pipeline.yaml \
--problem datasets/training_datasets/seed_datasets_archive/185_baseball/185_baseball_problem/problemDoc.json \
--input datasets/training_datasets/seed_datasets_archive/185_baseball/TRAIN/dataset_TRAIN/datasetDoc.json \
--test-input datasets/training_datasets/seed_datasets_archive/185_baseball/TEST/dataset_TEST/datasetDoc.json \
--output predictions.csv \
--output-run pipeline_run.yaml
For more information about the usage see CLI guide or run:
$ python3 -m d3m runtime --help
d3m.runtime
module provides also other high-level functions which can help
with data preparation (splitting) and scoring for evaluating pipelines.
To better understand how error handling is done in the reference runtime and how you can
debug your primitives and pipelines, read this HOWTO.
fit_result
and produce_result
objects above (of Result
class) contain values which
were asked to be retained and exposed during pipeline’s execution
(by default only the pipeline’s outputs are retained).
You can control which values are exposed by using
expose_produced_outputs
and outputs_to_expose
arguments of the
high-level functions, or --expose-produced-outputs
CLI argument.
Those objects also contain pipeline run information.
We saved it to a file in the CLI call with --output-run
argument, too.
To learn more about pipeline run information, read the next section.
Pipeline Run¶
PipelineRun
class represents the pipeline run. The pipeline run
contains information about many aspects of the pipeline’s execution and enables metalearning
and reproducibility to duplicate the execution at a later time.
All information for the pipeline run is automatically collected during pipeline’s execution. Moreover, it references also used pipeline, problem description, and input dataset. It contains hyper-parameter values provided when pipeline was run and information about the environment inside which the pipeline was run.
In the example above, we saved it to a file using the --output-run
argument. Pipeline runs
we represent in YAML because they can contain multiple documents, one for each execution phase.
The following is a sketch of the pipeline run representation:
context: <the context for this phase (TESTING for example)>
datasets:
- digest: <digest of the input dataset>
id: <ID of the input dataset>
end: <timestamp of when this phase ended>
environment: <details about the machine the phase was performed on>
id: <UUID of this pipeline run document>
pipeline:
digest: <digest of the pipeline>
id: <ID of the pipeline>
problem:
digest: <digest of the problem>
id: <ID of the problem>
random_seed: <random seed value, 0 by default>
run:
is_standard_pipeline: true
phase: FIT
results: <predictions of the fit phase>
schema: https://metadata.datadrivendiscovery.org/schemas/v0/pipeline_run.json
start: <timestamp of when this phase started>
status:
state: <whether this stage completed successfully or not>
steps: <details of running each step: hyper-parameters, timestamps, success, etc.>
--- <this indicates a divider between documents in YAML>
... documents for other phases ...
A pipeline run can also contain information about data preparation and scoring which might have been done before and after the pipeline was run. In the case of scoring, the pipeline run contains also scores computed.
Having a pipeline run allows you to rerun the pipeline at a later time, reproducing its results. Being able to rerun a pipeline is a critical step towards metalearning.
Now that we know how to write a pipeline and run it, we might want to add some custom logic to the pipeline. For that, we have to write our own primitive.