Environments

OPF Task Driver

This script is part of the Online Prediction Framework (OPF) suite. It implements the TaskDriver for an OPF experiment.

It’s used by OPF RunExperiment and may also be useful for swarming’s HyperSearch Worker

The TaskDriver is a simple state machine that:

  1. Accepts incoming records from the client one data row at a time.
  2. Cycles through phases in the requested iteration cycle; the phases may include any combination of learnOnly, inferOnly, and learnAndInfer.
  3. For each data row, generates an OPF Model workflow that corresponds to the current phase in the iteration cycle and the requested inference types.
  4. Emits inference results via user-supplied PredictionLogger
  5. Gathers requested inference metrics.
  6. Invokes user-proved callbaks (setup, postIter, finish)

Note

For the purposes of testing predictions and generating metrics, it assumes that all incoming dataset records are “sensor data” - i.e., ground truth. However, if you’re using OPFTaskDriver only to generate predictions, and not for testing predictions/generating metrics, they don’t need to be “ground truth” records.

class nupic.frameworks.opf.opf_task_driver.OPFTaskDriver(taskControl, model)

Task Phase Driver implementation

Conceptually, the client injects input records, one at a time, into an OPFTaskDriver instance for execution according to the current IterationPhase as maintained by the OPFTaskDriver instance.

Parameters:
  • taskControl – (dict) conforming to opfTaskControlSchema.json that defines the actions to be performed on the given model.
  • model – (nupic.frameworks.opf.model.Model) that this OPFTaskDriver instance will drive.
finalize()

Perform final activities, including ‘finish’ callbacks. This method MUST be called once after the last call to handleInputRecord().

getMetricLabels()
Returns:(list) labels for the metrics that are being calculated
getMetrics()

Gets the current metric values

Returns:A dictionary of metric values. The key for each entry is the label for the metric spec, as generated by nupic.frameworks.opf.metrics.MetricSpec.getLabel(). The value for each entry is a dictionary containing the value of the metric as returned by nupic.frameworks.opf.metrics.MetricsIface.getMetric().
handleInputRecord(inputRecord)

Processes the given record according to the current iteration cycle phase

Parameters:inputRecord – (object) record expected to be returned from nupic.data.record_stream.RecordStreamIface.getNextRecord().
Returns:nupic.frameworks.opf.opf_utils.ModelResult
replaceIterationCycle(phaseSpecs)

Replaces the Iteration Cycle phases

Parameters:phaseSpecs – Iteration cycle description consisting of a sequence of IterationPhaseSpecXXXXX elements that are performed in the given order
setup()

Performs initial setup activities, including ‘setup’ callbacks. This method MUST be called once before the first call to handleInputRecord().

class nupic.frameworks.opf.opf_task_driver.IterationPhaseSpecLearnOnly(nIters)

This class represents the Learn-only phase of the Iteration Cycle in the TaskControl block of description.py

Parameters:nIters – (int) iterations to remain in this phase. An iteration corresponds to a single OPFTaskDriver.handleInputRecord() call.
class nupic.frameworks.opf.opf_task_driver.IterationPhaseSpecInferOnly(nIters, inferenceArgs=None)

This class represents the Infer-only phase of the Iteration Cycle in the TaskControl block of description.py

Parameters:
  • nIters – (int) Number of iterations to remain in this phase. An iteration corresponds to a single OPFTaskDriver.handleInputRecord() call.
  • inferenceArgs – (dict) A dictionary of arguments required for inference. These depend on the InferenceType of the current model.
class nupic.frameworks.opf.opf_task_driver.IterationPhaseSpecLearnAndInfer(nIters, inferenceArgs=None)

This class represents the Learn-and-Infer phase of the Iteration Cycle in the TaskControl block of description.py

Parameters:
  • nIters – (int) Number of iterations to remain in this phase. An iteration corresponds to a single OPFTaskDriver.handleInputRecord() call.
  • inferenceArgs – (dict) A dictionary of arguments required for inference. These depend on the InferenceType of the current model.

OPF Environment

This file describes the interfaces for adapting OPFTaskDriver to specific environments.

These interfaces encapsulate external specifics, such as data source (e.g., .csv file or database, etc.), prediction sink (.csv file or databse, etc.), report and serialization destination, etc.

class nupic.frameworks.opf.opf_environment.PredictionLoggerIface

This class defines the interface for OPF prediction logger implementations.

checkpoint(checkpointSink, maxRows)

Save a checkpoint of the prediction output stream. The checkpoint comprises up to maxRows of the most recent inference records.

Parameters:
  • checkpointSink – A File-like object where predictions checkpoint data, if any, will be stored.
  • maxRows – (int) Maximum number of most recent inference rows to checkpoint.
close()

Closes connect to output store and cleans up any resources associated with writing.

setLoggedMetrics(metricNames)

Sets which metrics should be written to the prediction log.

Parameters:metricNames – (list) metric names that match the labels of the metrics that should be written to the prediction log
writeRecord(modelResult)

Emits a set of inputs data, inferences, and metrics from a model resulting from a single record.

Parameters:modelResult – (nupic.frameworks.opf.opf_utils.ModelResult) contains the model input and output for the current timestep.
writeRecords(modelResults, progressCB=None)

Same as writeRecord(), but emits multiple rows in one shot.

Parameters:
  • modelResults – (list) of nupic.frameworks.opf.opf_utils.ModelResult objects, each represents one record.
  • progressCB – (func) optional callback method that will be called after each batch of records is written.

OPF Basic Environment

This script provides a file-based implementation of the opf_environment interfaces (OPF).

This “basic” implementation of the interface (need a better name instead of “basic”) uses files (.csv, etc.) versus Nupic’s implementation that would use databases.

This implementation is used by research tools, such as scripts/run_opf_experiment.py.

The opf_environment interfaces encapsulate external specifics, such as data source (e.g., .csv file or database, etc.), prediction sink (.csv file or databse, etc.), report and serialization destination, etc.

class nupic.frameworks.opf.opf_basic_environment.PredictionMetricsLoggerIface

This is the interface for output of prediction metrics.

emitFinalMetrics(metrics)

Emits final metrics.

Note

the intention is that the final metrics may go to a different place (e.g., csv file) versus emitPeriodicMetrics() (e.g., stdout)

Parameters:metrics – A list of metrics as returned by nupic.frameworks.opf.opf_task_driver.OPFTaskDriver.getMetrics().
emitPeriodicMetrics(metrics)

Emits periodic metrics to stdout in JSON.

Parameters:metrics – A list of metrics as returned by nupic.frameworks.opf.opf_task_driver.OPFTaskDriver.getMetrics().
class nupic.frameworks.opf.opf_basic_environment.DatasetReaderIface

This is the interface class for a dataset readers

getDatasetFieldMetaData()
Returns:a tuple of dataset field metadata descriptors that are arranged in the same order as the columns in the dataset. Each field metadata descriptor is of type nupic.data.field_meta.FieldMetaInfo
next()
Returns:The next record from the dataset. The returned record object is of the same structure as returned by nupic.data.record_stream.RecordStreamIface.getNextRecord(). Returns None if the next record is not available yet.
Raises:(StopIteration) if a hard “end of file” has been reached and no more records will be forthcoming.
class nupic.frameworks.opf.opf_basic_environment.BasicPredictionMetricsLogger(experimentDir, label)

Bases: nupic.frameworks.opf.opf_basic_environment.PredictionMetricsLoggerIface

This is the file-based implementation of the interface for output of prediction metrics

TODO: where should periodic and final predictions go (versus stdout)

Parameters:
  • experimentDir – (string) path to directory for experiment to run.
  • label – (string) used to distinguish the output’s container (e.g., filename, directory name, property key, etc.).
class nupic.frameworks.opf.opf_basic_environment.BasicDatasetReader(streamDefDict)

Bases: nupic.frameworks.opf.opf_basic_environment.DatasetReaderIface

This is a CSV file-based implementation of DatasetReaderIface.

Parameters:streamDefDict – stream definition, as defined here.
class nupic.frameworks.opf.opf_basic_environment.NonTemporalPredictionLogAdapter(writer)

This class serves as an adapter for a client-instantiated Non-temporal log writer.

Parameters:writer – (PredictionWriterIface) Non-temporal prediction log writer
update(modelResult)

Emit a input/prediction pair, if possible.

modelResult: An opf_utils.ModelResult object that contains the model input
and output for the current timestep.
class nupic.frameworks.opf.opf_basic_environment.BasicPredictionLogger(fields, experimentDir, label, inferenceType, checkpointSource=None)

Bases: nupic.frameworks.opf.opf_environment.PredictionLoggerIface

This class implements logging of predictions to files as actual vs predicted values.

Parameters:
  • fields – (list) of nupic.data.field_meta.FieldMetaInfo objects representing the encoder-mapped data row field value sequences that will be emitted to this prediction logger.
  • experimentDir – (string) experiment directory path that contains description.py
  • label – (string) to incorporate into the filename.
  • checkpointSource – If not None, a File-like object containing the previously-checkpointed predictions for setting the initial contents of this output stream. Will be copied before returning, if needed.
class nupic.frameworks.opf.opf_basic_environment.NonTemporalPredictionLogAdapter(writer)

This class serves as an adapter for a client-instantiated Non-temporal log writer.

Parameters:writer – (PredictionWriterIface) Non-temporal prediction log writer
update(modelResult)

Emit a input/prediction pair, if possible.

modelResult: An opf_utils.ModelResult object that contains the model input
and output for the current timestep.