streamsx.pmml package

PMML integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:


Provides functions to score input records using PMML models and to interact with the Watson Machine Learning (WML) repository.


A simple example of a Streams application uses the model_feed() and score() functions:

from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema, StreamSchema
from streamsx.topology.context import submit
import streamsx.pmml as pmml

topo = Topology()
# IBM Cloud Machine Learning service credentials
credentials = '{"url" : "xxx", "instance_id" : "icp"}'
models = pmml.model_feed(topo, connection_configuration=credentials, model_name="sample_pmml", polling_period=datetime.timedelta(minutes=5))
# sample with a single model predictor field
s = topo.source(['first tuple', 'second tuple']).as_string()
out_schema = StreamSchema('tuple<rstring string, rstring result>')
res = pmml.score(s, schema=out_schema, model_input_attribute_mapping='sample_predictor=string', model_stream=models, raw_result_attribute_name='result', initial_model_provisioning_timeout=datetime.timedelta(minutes=1))
# Use for IBM Streams including IBM Cloud Pak for Data
submit ('DISTRIBUTED', topo)
# Use for IBM Streaming Analytics service in IBM Cloud
streamsx.pmml.score(stream, schema, model_input_attribute_mapping, model_output_attribute_mapping=None, model_stream=None, model_path=None, success_attribute_name=None, error_reason_attribute_name=None, raw_result_attribute_name=None, wml_meta_data_attribute_name=None, initial_model_provisioning_timeout=None, name=None)

Uses the PMMLScoring operator to score tuple data.

The PMMLScoring operator scores tuple data it receives on the first port, mapping input attributes to model predictors of a configurable PMML model, which may be updated via a second port during runtime. The predicted value (score) is sent together with the original input tuple and some model meta information to the ouput port. The mapping of model output fields to streams output attributes can be configured. The model data can be loaded from a file on startup of the operator. Additionally, model data can be sent to the second input port in PMML format. This allows to update the model during runtime.

  • stream (Stream) – Stream of tuples containing the records to be scored.

  • schema (Schema) – Output streams schema

  • model_input_attribute_mapping (str) – Maps input stream attributes to predictors in the format predictorName1=streamsAttribute1,predictorName2=streamsAttribute2,...

  • model_output_attribute_mapping (str) – Maps output stream attributes to model output fields in the format streamsAttribute1=modelOutputField1,streamsAttribute2=modelOutputField2,...

  • model_stream (Stream) – Stream of tuples containing new model versions and model metadata. The tuple requires the type Connect the output stream of model_feed to this port.

  • model_path (str) – The path to a local model file. The file has to be in PMML format. This model is loaded on startup of the operator and used for scoring until a new model arrives at the second input port of the operator. Metadata like name, version, etc. for that model cannot be specifed. Therefore the metadata related attributes on the output port are set to ‘unknown’ as long as this model is used.

  • success_attribute_name (str) – Specify the name of an ouput Stream attribute of type ‘boolean’. If set, the result of the scoring operation is stored in this attribute. The value is ‘true’ if the scoring succeeded, ‘false’ if an error occured.

  • error_reason_attribute_name (str) – Specify the name of an ouput Stream attribute of type ‘rstring’. If set, an error description is stored in this attribute, in case the operation failed. If the scoring operation was successful, en empty string is stored in the attribute.

  • raw_result_attribute_name (str) – Use this parameter to get the model output as JSON string. It specifies the name of an output attribute of type ‘rstring’ that will get the JSON string. The JSON structure is an array. Each entry contains a row returned from the model after scoring the input record. The entris contain the returned value and the ResultDesciptor that contains all metadata about the entry.

  • wml_meta_data_attribute_name (str) – Specifies the name of an ouput Stream attribute of type ‘map<rstring,rstring>, If set, the map will contain the metadata fetched from the WML repository by the ‘WMLModelFeed operator. The data will be just passed through by this operator for debugging and reference purposes. In case the model was not loaded from the WML repository, but by using the ‘modelPath’ parameter, the map will be empty.

  • initial_model_provisioning_timeout (int|datetime.timedelta) – Setting this parameter causes the operator to wait for some time until the inital model is loaded. If the modelPath parameter is not used, no initial model is loaded from a file during operator startup. In this case the operator will send tuples to the output port without scoring them. Instead the error indicator is set. To allow for some wait time before the model is loaded from the WML repository, set the parameter to the number of seconds to wait before the initial model is loaded. If the model is not loaded within this time interval, the operator aborts.

  • name (str) – Operator name in the Streams context, defaults to a generated name.


Output Stream with specified schema

streamsx.pmml.model_feed(topology, connection_configuration, model_name=None, model_uid=None, polling_period=None, name=None)

Downloads a Machine Learning (ML) model from the IBM Cloud Machine-Learning-Service as input for PMML score function.

Models can be created and trained in Watson Studio or by using notebooks.

  • topology (Topology) – Topology to contain the returned stream.

  • connection_configuration (dict,str) – The credentials of the IBM cloud Machine Learning service in JSON or name of the application configuration.

  • model_name (str) – A model in the WML repository can be referenced by its name or UID. When you use the name, keep in mind that in the concept of the WML repository the name is ambiguous. Different models may have the same name. The only unique identifier is the model UID. Using the name may be more comfortable as the UID is a long digit string. When you are using the name, make sure that the name is unique in the WML repository. If a name is not unique, the operator will use the first model that matches the name. Use either the model_name parameter or the model_uid parameter, if both are given model_name is ignored.

  • model_uid (str) – In the WML repository a models UID is a unique identifier. If the model is updated with a new version the UID is the still the same. Use either model_name or model_uid parameter, if both are given model_name is ignored.

  • polling_period (int|datetime.timedelta) – The polling_period controls the interval between the calls to the WML repository. Value can be specified in seconds if ‘int’ type is used or in ‘datetime.timedelta’ format.

  • name (str) – Source name in the Streams context, defaults to a generated name.


Object names stream with schema

Return type


Indices and tables