Data

Data wrappers and helpers.

Field Meta

This module defines the structure of meta-data that describes the field name, field type, special field attribute, etc. for a field in a dataset.

class nupic.data.field_meta.FieldMetaInfo(name, type, special)

This class acts as a container of meta-data for a single field (column) of a dataset. Each instance of this class has name, type, and special properties.

Examples:

  1. Access a sub-element from an instance of FieldMetaInfo:

    • metainfo.name
    • metainfo.type
    • metainfo.special
  2. Create a single element of FieldMetaInfo from a tuple of name, type, and special:

    e = ('pounds', FieldMetaType.float, FieldMetaSpecial.none)
    m = FieldMetaInfo.createFromFileFieldElement(e)
    
Parameters:
  • name (str) – field name
  • type (str) – one of the values from FieldMetaType
  • special (str) – one of the values from FieldMetaSpecial
Raises:

ValueError – if type or special arg values are invalid

static createFromFileFieldElement(fieldInfoTuple)

Creates a field_meta.FieldMetaInfo instance from a tuple containing name, type, and special.

Parameters:fieldInfoTuple – Must contain name, type, and special
Returns:(FieldMetaInfo) instance
classmethod createListFromFileFieldList(fields)

Creates a FieldMetaInfo list from the a list of tuples. Basically runs createFromFileFieldElement() on each tuple.

Example:

# Create a list of FieldMetaInfo instances from a list of File meta-data
# tuples
el = [("pounds", FieldMetaType.float, FieldMetaSpecial.none),
      ("price", FieldMetaType.float, FieldMetaSpecial.none),
      ("id", FieldMetaType.string, FieldMetaSpecial.sequence),
      ("date", FieldMetaType.datetime, FieldMetaSpecial.timestamp),
     ]
ml = FieldMetaInfo.createListFromFileFieldList(el)
Parameters:fields – a sequence of field attribute tuples conforming to the format of name, type, and special
Returns:A list of FieldMetaInfo elements corresponding to the given ‘fields’ list.
class nupic.data.field_meta.FieldMetaInfoBase(name, type, special)
name

Alias for field number 0

special

Alias for field number 2

type

Alias for field number 1

class nupic.data.field_meta.FieldMetaSpecial

Public values for the “special” field attribute. Valid values are:

  • R: reset
  • S: sequence
  • T: timestamp
  • C: category
  • L: learning
classmethod isValid(attr)

Check a candidate value whether it’s one of the valid attributes

Parameters:attr – (string) candidate value
Returns:True if the candidate value is a legitimate “special” field attribute; False if not
class nupic.data.field_meta.FieldMetaType

Public values for the field data types. Valid types are:

  • string
  • datetime
  • int
  • float
  • bool
  • list
  • sdr
classmethod isValid(fieldDataType)

Check a candidate value whether it’s one of the valid field data types

Parameters:fieldDataType – (string) candidate field data type
Returns:True if the candidate value is a legitimate field data type value; False if not

FileRecordStream

CSV file based implementation of a record stream

FileRecordStream is a class that can read and write .csv files that contain records. The file has 3 header lines that contain, for each field, the name (line 1), type (line 2), and a special indicator (line 3). The special indicator can indicate that the field specifies a reset, is a sequence ID, or is a timestamp for the record.

You can see an example of a NuPIC data file formatted for FileRecordStream in the quick start.

The header lines look like:

f1,f2,f3,....fN
int,string,datetime,bool,...
R,S,T,,,,....

The data lines are just comma separated values that match the types in the second header line. The supported types are: int, float, string, bool, datetime

The format for datetime fields is yyyy-mm-dd hh:mm:ss.us The ‘us’ component is microseconds.

When reading a file the FileRecordStream will automatically read the header line and will figure out the type of each field and what are the timestamp, reset and sequenceId fields (if any).

FileRecordStream supports the context manager (with statement) protocol. That means you can do:

with FileRecordStream(filename) as f:
  ...
  ...

When the control exits the with block the file will be closed automatically. You may still call the close() method at any point (even multiple times).

FileRecordStream also supports the iteration protocol so you may read its contents using a for loop:

for r in f:
  print r
class nupic.data.file_record_stream.FileRecordStream(streamID, write=False, fields=None, missingValues=None, bookmark=None, includeMS=True, firstRecord=None)

CSV file based RecordStream implementation

Each field is a 3-tuple (name, type, special or FieldMetaSpecial.none)

The name is the name of the field. The type is one of the constants in FieldMetaType. The special is one of the FieldMetaSpecial values that designate their field as the sequenceId, reset, timestamp, or category. With exception of multiple categories, there can be at most one of each. There may be multiple fields of type datetime, but no more than one of them may be the timestamp field (FieldMetaSpecial.timestamp). The sequence id field must be either a string or an int. The reset field must be an int (and must contain 0 or 1).

The category field must be an int or space-separated list of ints, where the former represents single-label classification and the latter is for multi-label classification (e.g. “1 3 4” designates a record for labels 1, 3, and 4). The number of categories is allowed to vary record to record; sensor regions represent non-categories with -1, thus the category values must be >= 0.

The FileRecordStream iterates over the field names, types and specials and stores the information.

Parameters:
  • streamID – CSV file name, input or output
  • write – True or False, open for writing if True
  • fields – a list of nupic.data.fieldmeta.FieldMetaInfo field descriptors, only applicable when write==True
  • missingValues – what missing values should be replaced with?
  • bookmark – a reference to the previous reader, if passed in, the records will be returned starting from the point where bookmark was requested. Either bookmark or firstRecord can be specified, not both. If bookmark is used, then firstRecord MUST be None.
  • includeMS – If false, the microseconds portion is not included in the generated output file timestamp fields. This makes it compatible with reading in from Excel.
  • firstRecord – 0-based index of the first record to start reading from. Either bookmark or firstRecord can be specified, not both. If bookmark is used, then firstRecord MUST be None.
appendRecord(record)

Saves the record in the underlying csv file.

Parameters:record – a list of Python objects that will be string-ified
appendRecords(records, progressCB=None)

Saves multiple records in the underlying storage.

Parameters:
  • records – array of records as in appendRecord()
  • progressCB – (function) callback to report progress
clearStats()

Resets stats collected so far.

close()

Closes the stream.

flush()

Flushes the file.

getBookmark()

Gets a bookmark or anchor to the current position.

Returns:an anchor to the current position in the data. Passing this anchor to a constructor makes the current position to be the first returned record.
getDataRowCount()
Returns:(int) count of data rows in dataset (excluding header lines)
getError()

Not implemented. CSV file version does not provide storage for the error information

getFieldNames()
Returns:(list) field names associated with the data.
getFields()
Returns:a sequence of FieldMetaInfo name/type/special tuples for each field in the stream.
getNextRecord(useCache=True)

Returns next available data record from the file.

Returns:a data row (a list or tuple) if available; None, if no more records in the table (End of Stream - EOS); empty sequence (list or tuple) when timing out while waiting for the next record.
getNextRecordIdx()
Returns:(int) the index of the record that will be read next from getNextRecord().
getStats()

Parse the file using dedicated reader and collect fields stats. Never called if user of FileRecordStream does not invoke getStats() method.

Returns:a dictionary of stats. In the current implementation, min and max fields are supported. Example of the return dictionary is:
{
  'min' : [f1_min, f2_min, None, None, fn_min],
  'max' : [f1_max, f2_max, None, None, fn_max]
}

(where fx_min/fx_max are set for scalar fields, or None if not)

isCompleted()

Not implemented. CSV file is always considered completed.

next()

Implement the iterator protocol

recordsExistAfter(bookmark)

Returns whether there are more records from current position. bookmark is not used in this implementation.

Returns:True if there are records left after current position.
rewind()

Put us back at the beginning of the file again.

seekFromEnd(numRecords)

Seeks to numRecords from the end and returns a bookmark to the new position.

Parameters:numRecords – how far to seek from end of file.
Returns:bookmark to desired location.
setAutoRewind(autoRewind)

Controls whether getNextRecord() should automatically rewind the source when EOF is reached.

Parameters:autoRewind

(bool)

setCompleted(completed=True)

Not implemented: CSV file is always considered completed, nothing to do.

setError(error)

Not implemented. CSV file version does not provide storage for the error information

RecordStream

Interface for different types of storages (file, hbase, rio, etc).

class nupic.data.record_stream.RecordStreamIface

This is the interface for the record input/output storage classes.

appendRecord(record)

Saves the record in the underlying storage. Should be implemented in subclasses.

Parameters:record – (object) to store
appendRecords(records, progressCB=None)

Saves multiple records in the underlying storage. Should be implemented in subclasses.

Parameters:
  • records – (list) of objects to store
  • progressCB – (func) called after each appension
clearStats()

Resets stats collected so far.

close()

Close the stream

flush()

Flush the file to disk

getAggregationMonthsAndSeconds()

Returns the aggregation period of the record stream as a dict containing ‘months’ and ‘seconds’. The months is always an integer and seconds is a floating point. Only one is allowed to be non-zero.

If there is no aggregation associated with the stream, returns None.

Typically, a raw file or hbase stream will NOT have any aggregation info, but subclasses of RecordStreamIface, like StreamReader, will and will return the aggregation period from this call. This call is used by getNextRecordDict() to assign a record number to a record given its timestamp and the aggregation interval.

Returns:None
getBookmark()

Returns an anchor to the current position in the data. Passing this anchor to the constructor makes the current position to be the first returned record. If record is no longer in the storage, the first available after it will be returned.

Returns:anchor to current position in the data.
getCategoryFieldIdx()
Returns:(int) index of category field
getError()
Returns:errors saved in the storage.
getFieldMax(fieldName)

If underlying implementation does not support min/max stats collection, or if a field type does not support min/max (non scalars), the return value will be None.

Parameters:fieldName – (string) name of field to get max
Returns:current maximum value for the field fieldName.
getFieldMin(fieldName)

If underlying implementation does not support min/max stats collection, or if a field type does not support min/max (non scalars), the return value will be None.

Parameters:fieldName – (string) name of field to get min
Returns:current minimum value for the field fieldName.
getFieldNames()
Returns:(list) of field names associated with the data.
getFields()
Returns:(list) of nupic.data.fieldmeta.FieldMetaInfo objects for each field in the stream. Might be None, if that information is provided externally (through the Stream Definition, for example).
getLearningFieldIdx()
Returns:(int) index of the learning field.
getNextRecord(useCache=True)

Returns next available data record from the storage. If useCache is False, then don’t read ahead and don’t cache any records.

Returns:a data row (a list or tuple) if available; None, if no more records in the table (End of Stream - EOS); empty sequence (list or tuple) when timing out while waiting for the next record.
getNextRecordDict()

Returns next available data record from the storage as a dict, with the keys being the field names. This also adds in some meta fields:

  • _category: The value from the category field (if any)
  • _reset: True if the reset field was True (if any)
  • _sequenceId: the value from the sequenceId field (if any)
getNextRecordIdx()
Returns:(int) index of the record that will be read next from getNextRecord()
getResetFieldIdx()
Returns:(int) index of the reset field; None if no such field.
getSequenceIdFieldIdx()
Returns:(int) index of the sequenceId field.
getStats()
Returns:storage stats (like min and max values of the fields).
getTimestampFieldIdx()
Returns:(int) index of the timestamp field.
isCompleted()
Returns:True if all records are already in the storage or False if more records is expected.
recordsExistAfter(bookmark)
Parameters:bookmark – (int) where to start
Returns:True if there are records left after the bookmark.
rewind()

Put us back at the beginning of the file again.

seekFromEnd(numRecords)
Parameters:numRecords – (int) number of records from the end.
Returns:(int) a bookmark numRecords from the end of the stream.
setCompleted(completed)

Marks the stream completed.

Parameters:completed – (bool) is completed?
setError(error)

Saves specified error in the storage.

Parameters:error – Error to store.
setTimeout(timeout)

Set the read timeout in seconds

Parameters:timeout – (int or floating point)

StreamReader

class nupic.data.stream_reader.StreamReader(streamDef, bookmark=None, saveOutput=False, isBlocking=True, maxTimeout=0, eofOnTimeout=False)

Implements a stream reader. This is a high level class that owns one or more underlying implementations of a RecordStreamIface. Each RecordStreamIface implements the raw reading of records from the record store (which could be a file, hbase table or something else).

In the future, we will support joining of two or more RecordStreamIface‘s (which is why the streamDef accepts a list of ‘stream’ elements), but for now only 1 source is supported.

The class also implements aggregation of the (in the future) joined records from the sources.

This module parses the stream definition (as defined in /src/nupic/frameworks/opf/jsonschema/stream_def.json), creates the RecordStreamIface for each source (‘stream’ element) defined in the stream def, performs aggregation, and returns each record in the correct format according to the desired column names specified in the streamDef.

This class implements the RecordStreamIface interface and thus can be used in place of a raw record stream.

This is an example streamDef:

{
  'version': 1
  'info': 'test_hotgym',

  'streams': [
      {'columns': [u'*'],
       'info': u'hotGym.csv',
       'last_record': 4000,
       'source': u'file://extra/hotgym/hotgym.csv'}.
  ],

  'timeField': 'timestamp',

  'aggregation': {
    'hours': 1,
    'fields': [
        ('timestamp', 'first'),
        ('gym', 'first'),
        ('consumption', 'sum')
    ],
  }

}
Parameters:
  • streamDef – The stream definition, potentially containing multiple sources (not supported yet). See src//nupic/frameworks/opf/jsonschema/stream_def.json for the format of this dict
  • bookmark – Bookmark to start reading from. This overrides the first_record field of the streamDef if provided.
  • saveOutput – If true, save the output to a csv file in a temp directory. The path to the generated file can be found in the log output.
  • isBlocking – should read operation block forever if the next row of data is not available, but the stream is not marked as ‘completed’ yet?
  • maxTimeout – if isBlocking is False, max seconds to wait for more data before timing out; ignored when isBlocking is True.
  • eofOnTimeout – If True and we get a read timeout (isBlocking must be False to get read timeouts), assume we’ve reached the end of the input and produce the last aggregated record, if one can be completed.
clearStats()

Resets stats collected so far.

close()

Close the stream

getAggregationMonthsAndSeconds()

Returns the aggregation period of the record stream as a dict containing ‘months’ and ‘seconds’. The months is always an integer and seconds is a floating point. Only one is allowed to be non-zero at a time.

Will return the aggregation period from this call. This call is used by the nupic.data.record_stream.RecordStream.getNextRecordDict() method to assign a record number to a record given its timestamp and the aggregation interval.

Returns:aggregationPeriod (as a dict) where:
  • months: number of months in aggregation period
  • seconds: number of seconds in aggregation period (as a float)
getBookmark()
Returns:a bookmark to the current position
getDataRowCount()

Iterates through stream to calculate total records after aggregation. This will alter the bookmark state.

getError()
Returns:errors saved in the stream.
getFieldNames()

Returns all fields in all inputs (list of plain names).

Note

currently, only one input is supported

getFields()
Returns:a sequence of nupic.data.fieldmeta.FieldMetaInfo for each field in the stream.
getNextRecord()

Returns combined data from all sources (values only).

Returns:None on EOF; empty sequence on timeout.
getNextRecordIdx()
Returns:the index of the record that will be read next from getNextRecord().
getStats()

TODO: This method needs to be enhanced to get the stats on the aggregated records.

Returns:stats (like min and max values of the fields).
isCompleted()
Returns:True if all records have been read.
recordsExistAfter(bookmark)
Returns:True if there are records left after the bookmark.
setCompleted(completed=True)

Marks the stream completed (True or False)

Parameters:completed – (bool) is completed or not
setError(error)

Saves specified error in the stream.

Parameters:error – to save
setTimeout(timeout)

Set the read timeout.

Parameters:timeout – (float or int) timeout length

Stream Definition

See an example at Stream Definition.

Utilities

Collection of utilities to process input data

nupic.data.utils.DATETIME_FORMATS = ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S:%f', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M', '%Y-%m-%d', '%m/%d/%Y %H:%M', '%m/%d/%y %H:%M', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S')

These are the supported timestamp formats to parse. The first is the format used by NuPIC when serializing datetimes.

nupic.data.utils.escape(s)

Escape commas, tabs, newlines and dashes in a string

Commas are encoded as tabs.

Parameters:s – (string) to escape
Returns:(string) escaped string
nupic.data.utils.floatOrNone(f)

Tries to convert input to a float input or returns None.

Parameters:f – (object) thing to convert to a float
Returns:(float or None)
nupic.data.utils.intOrNone(i)

Tries to convert input to a int input or returns None.

Parameters:f – (object) thing to convert to a int
Returns:(int or None)
nupic.data.utils.parseBool(s)

String to boolean

Parameters:s – (string)
Returns:(bool)
nupic.data.utils.parseSdr(s)

Parses a string containing only 0’s and 1’s and return a Python list object.

Parameters:s – (string) string to parse
Returns:(list) SDR out
nupic.data.utils.parseStringList(s)

Parse a string of space-separated numbers, returning a Python list.

Parameters:s – (string) to parse
Returns:(list) binary SDR
nupic.data.utils.parseTimestamp(s)

Parses a textual datetime format and return a Python datetime object.

The supported format is: yyyy-mm-dd h:m:s.ms

The time component is optional.

  • hours are 00..23 (no AM/PM)
  • minutes are 00..59
  • seconds are 00..59
  • micro-seconds are 000000..999999
Parameters:s – (string) input time text
Returns:(datetime.datetime)
nupic.data.utils.serializeSdr(sdr)

Serialize Python list object containing only 0’s and 1’s to string.

Parameters:sdr – (list) binary
Returns:(string) SDR out
nupic.data.utils.serializeTimestamp(t)

Turns a datetime object into a string.

Parameters:t – (datetime.datetime)
Returns:(string) in default format (see DATETIME_FORMATS [0])
nupic.data.utils.serializeTimestampNoMS(t)

Turns a datetime object into a string ignoring milliseconds.

Parameters:t – (datetime.datetime)
Returns:(string) in default format (see DATETIME_FORMATS [2])
nupic.data.utils.stripList(listObj)

Convert a list of numbers to a string of space-separated values.

Parameters:listObj – (list) to convert
Returns:(string) of space-separated values
nupic.data.utils.unescape(s)

Unescapes a string that may contain commas, tabs, newlines and dashes

Commas are decoded from tabs.

Parameters:s – (string) to unescape
Returns:(string) unescaped string
nupic.data.utils.DATETIME_FORMATS

These are the supported timestamp formats to parse. The first is the format used by NuPIC when serializing datetimes.