Schematic data pipelines in Python¶
This is a package to write robust pipelines for data science and data engineering in Python 3. Thanks for checking it out.
A major challenge in creating a robust data pipeline is guaranteeing interoperability between pipes. Data transformations often change the underlying data representation (e.g. change column type, add columns, convert PySpark DataFrame to Pandas or H2O DataFrames). This makes it difficult to track what exactly is going on at a certain point of the pipeline, which often requires running the whole pipeline until that point to debug a certain pipe.
This package declares a simple API to define data transformations that know what schema they require to run, what schema they return, and what states they depend on.
Under this API, you define a Pipe
as follows (an example):
from pipeline import pipe, types
class MyPipe(pipe.Pipe):
requirements = {'sklearn'}
fit_requires = {
# (arbitrary items, arbitrary features)
'x': types.Array(np.float64, shape=(None, None)),
'y': types.List(float)
}
transform_requires = {
'x': types.List(float)
}
fit_parameters = {
'gamma': float
}
# parameter assigned in fit; the pipe's state
fitted_parameters = {
'a': float
}
# type and key of transformed data
transform_modifies = {
'b': float
}
def fit(self, data, parameters=None):
# accesses data['x'], data['y'] and parameters['gamma']; expects the types defined above
# assigns a float to self['a']
def transform(self, data):
# assigns a float to data['b']
return data
Without reading nor executing fit
and transform
, we know how data
will flow through this pipe:
- it requires an
'x'
and'y'
and a parametergamma
in fit - it is stateful through
a
- it transforms
data['b']
.
This allows to check whether a Pipeline
is consistent without executing
fit
or transform
of any pipe.
Specifically, you can execute the pipe using the traditional fit-transform idiom,
p = MyPipe()
p.fit(train_data, {'gamma': 1.0})
result = p.transform(test_data)
but also check whether the data format that you pass is consistent with its requirements:
p = MyPipe()
exceptions_fit = p.check_fit({'x': 1}, {'gamma': 1.0})
assert len(exceptions_fit) > 0
exceptions_transform = p.check_transform({'x': 1})
assert len(exceptions_transform) > 0
which does not execute fit
nor transform
.
The biggest advantage of this declaration is that when the pipes are used within a pipeline, Schemaflow can compute how the schema flows and therefore know the schema flow of a Pipeline:
p = schemaflow.pipeline.Pipeline([
('fix_ids', PipeA()),
('join_tables_with_fix', PipeB()),
('featurize', Featurize1_Pipeline()),
('model', Model1_Pipeline()),
('export_metrics', Export_results_PDF_Pipeline()),
('export_metrics', PushResultsToCache())
])
print(p.transform_modifies)
I.e. because we know how each Pipe modifies the schema, we can compute how the schema flows through it and
therefore obtain what are the dependencies of p
and what it transforms.
When to use this package¶
Use it when you are fairly certain that:
- there is the need for a complex data pipeline (e.g. more than 1 data source and different data types)
- the data transforms are expensive (e.g. Spark, Hive, SQL)
- your data pipeline aims to be maintainable and reusable (e.g. production code)
Pipe¶
-
class
schemaflow.pipe.
Pipe
[source]¶ A Pipe represents a stateful data transformation.
Data in this context consists of a Python dictionary whose each value is a type with some representation of data, either in-memory (e.g.
float
,pandas.DataFrame
) of remote (e.g.pyspark.sql.DataFrame
,sqlalchemy
).A
Pipe
is defined by:a method
transform()
that:- uses the keys
transform_modifies
fromdata
- uses the
state
- modifies the keys in
transform_modifies
indata
- uses the keys
a method
fit()
that:- uses (training) keys
fit_requires
fromdata
- uses (passed)
fit_parameters
- modifies the keys
fitted_parameters
instate
- uses (training) keys
a set of
requirements
(a set of package names, e.g.{'pandas'}
) of the transformation
All
transform_modifies
andfit_requires
have aType
that can be used to check that the Pipe’s input is consistent, withThe existence of the requirements can be checked using
The rational is that you can run
check_*
with access only to the data’s schema. This is specially important when the schemaflow is an expensive operation.-
requirements
= set()¶ set of packages required by the Pipe.
-
transform_requires
= {}¶ the data schema required in
transform()
; a dictionarystr
:Type
.
-
transform_modifies
= {}¶ type and key of
transform()
-
state
= None¶ A dictionary with the states of the Pipe. Use [] operator to access and modify it.
-
check_requirements
¶ Checks for requirements.
Returns: a list of exceptions with missing requirements
-
check_fit
(data: dict, parameters: dict = None, raise_: bool = False)[source]¶ Checks that a given data has a valid schema to be used in
fit()
.Parameters: - data – a dictionary with either
(str, Type)
or(str, instance)
- parameters – a dictionary with either
(str, Type)
or(str, instance)
- raise – whether it should raise the first found exception or list them all (default: list them)
Returns: a list of (subclasses of)
PipelineError
with all failed checks.- data – a dictionary with either
-
check_transform
(data: dict, raise_: bool = False)[source]¶ Checks that a given data has a valid schema to be used in
transform()
.Parameters: - data – a dictionary with either
(str, Type)
or(str, instance)
- raise – whether it should raise the first found exception or list them all (default: list them)
Returns: a list of (subclasses of)
schemaflow.exceptions.SchemaFlowError
with all missing arguments.- data – a dictionary with either
-
transform_schema
(schema: dict)[source]¶ Transforms the
schema
into a new schema based ontransform_modifies
.Parameters: schema – a dictionary of pairs str
Type
.Returns: the new schema.
-
fit
(data: dict, parameters: dict = None)[source]¶ Modifies the instance’s
state
.Parameters: - data – a dictionary of pairs
(str, object)
. - parameters – a dictionary of pairs
(str, object)
.
Returns: None
- data – a dictionary of pairs
-
transform
(data: dict)[source]¶ Modifies the data keys identified in
transform_modifies
.Parameters: data – a dictionary of pairs (str, object)
.Returns: the modified data
Pipeline¶
-
class
schemaflow.pipeline.
Pipeline
(pipes)[source]¶ A list of
Pipe
’s that are applied sequentially.Pipeline
is aPipe
and can be part of anotherPipeline
.Parameters: pipes – a list
orOrderedDict
ofPipe
. If passed as a list, you can either pass pipes or tuples(name, Pipe)
.-
pipes
= None¶ An
OrderedDict
whose keys are the pipe’s names orstr(index)
whereindex
is the pipe’s position in the sequence and the values arePipe
’s.
-
transform_requires
¶ The data schema required in
transform()
.
-
transform_modifies
¶ The schema modifications that this Pipeline apply in
transform
.When a key is modified more than once, changes are appended as a list.
-
fitted_parameters
¶ Parameters assigned to fit of each pipe.
Returns: a dictionary with the pipe’s name and their respective fitted_parameters
.
-
requirements
¶ Set of packages required by the Pipeline. The union of all
requirements
of all pipes in the Pipeline.
-
check_transform
(data: dict = None, raise_: bool = False)[source]¶ Checks that a given data has a valid schema to be used in
transform()
.Parameters: - data – a dictionary with either
(str, Type)
or(str, instance)
- raise – whether it should raise the first found exception or list them all (default: list them)
Returns: a list of (subclasses of)
schemaflow.exceptions.SchemaFlowError
with all missing arguments.- data – a dictionary with either
-
check_fit
(data: dict, parameters: dict = None, raise_: bool = False)[source]¶ Checks that a given data has a valid schema to be used in
fit()
.Parameters: - data – a dictionary with either
(str, Type)
or(str, instance)
- parameters – a dictionary with either
(str, Type)
or(str, instance)
- raise – whether it should raise the first found exception or list them all (default: list them)
Returns: a list of (subclasses of)
PipelineError
with all failed checks.- data – a dictionary with either
-
transform
(data: dict)[source]¶ Applies each of
transform()
sequentially intodata
.Parameters: data – a dictionary of pairs str, object
.Returns: the transformed data.
-
transform_schema
(schema: dict)[source]¶ Transforms the
schema
into a new schema based ontransform_modifies
.Parameters: schema – a dictionary of pairs str
Type
.Returns: the new schema.
-
fit
(data: dict, parameters: dict = None)[source]¶ Fits the
pipes
in sequence:p1.fit
,p1.transform
,p2.fit
,p2.transform
, …,pN.transform
.Parameters: - data – a dictionary of pairs
(str, object)
. - parameters – a dictionary
{pipe_name: {str: object}}
, where each of its value is the parameters to be passed to the respective’s pipe namedpipe_name
.
Returns: None
- data – a dictionary of pairs
-
logged_transform
(data: dict)[source]¶ Performs the same operation as
transform()
while logging the schema on each intermediary step.It also logs schema inconsistencies as errors. Specifically, for each pipe, it checks if its input data is consistent with its
transform_requires
, and whether its output data is consistent with itstransform_modifies
.This greatly helps the Pipeline developer to identify problems in the pipeline.
Parameters: data – a dictionary of pairs str
Type
.Returns: the transformed data.
-
logged_fit
(data: dict, parameters: dict = None)[source]¶ Performs the same operation as
fit()
while logging the schema on each intermediary step.It also logs schema inconsistencies as errors. Specifically, for each pipe, it checks if its input data is consistent with its
fit_requires
, and whether its state changes is consistent with itsfitted_parameters
.This greatly helps the Pipeline developer to identify problems in the pipeline.
Parameters: - data – a dictionary of pairs
(str, object)
. - parameters – a dictionary of pairs
(str, object)
.
Returns: None
- data – a dictionary of pairs
-
Types¶
-
class
schemaflow.types.
Type
[source]¶ The base type of all types. Used to declare new types to be used in
schemaflow.pipe.Pipe
.The class attribute
requirements
(a set of strings) is used to define if using this type has package requirements (e.g. numpy).-
requirements
= {}¶ set of packages required for this type to be usable.
-
classmethod
base_type
()[source]¶ A class property that returns the underlying type of this Type. :return:
-
-
class
schemaflow.types.
PySparkDataFrame
(schema: dict)[source]¶ Representation of a pyspark.sql.DataFrame. Requires
pyspark
.
-
class
schemaflow.types.
PandasDataFrame
(schema: dict)[source]¶ Representation of a pandas.DataFrame. Requires
pandas
.
Exceptions¶
-
exception
schemaflow.exceptions.
SchemaFlowError
(locations: list = None)[source]¶ The base exception of Pipeline
-
exception
schemaflow.exceptions.
NotFittedError
(pipe, key, locations: list = None)[source]¶ SchemaFlowError
raised when someone tries to access a non-fitted parameter.
-
exception
schemaflow.exceptions.
MissingRequirement
(object_type: type, requirement: str, locations: list = None)[source]¶ SchemaFlowError
raised when a requirement is missing
-
exception
schemaflow.exceptions.
WrongSchema
(expected_columns, passed_columns, locations: list = None)[source]¶ SchemaFlowError
raised when the schema of a datum is wrong (e.g. wrong shape)
-
exception
schemaflow.exceptions.
WrongParameter
(expected_columns, passed_columns, locations: list = None)[source]¶ SchemaFlowError
raised when unexpected parameters are passed to fit.
-
exception
schemaflow.exceptions.
WrongType
(expected_type, base_type, locations: list = None)[source]¶ SchemaFlowError
raised when the type of the datum is wrong
-
exception
schemaflow.exceptions.
WrongShape
(expected_shape, shape, locations: list = None)[source]¶ SchemaFlowError
raised when the shape of the datum is wrong