Data¶
Data wrappers and helpers.
Field Meta¶
This module defines the structure of meta-data that describes the field name, field type, special field attribute, etc. for a field in a dataset.
-
class
nupic.data.field_meta.
FieldMetaInfo
(name, type, special)¶ This class acts as a container of meta-data for a single field (column) of a dataset. Each instance of this class has
name
,type
, andspecial
properties.Examples:
Access a sub-element from an instance of FieldMetaInfo:
metainfo.name
metainfo.type
metainfo.special
Create a single element of
FieldMetaInfo
from a tuple ofname
,type
, andspecial
:e = ('pounds', FieldMetaType.float, FieldMetaSpecial.none) m = FieldMetaInfo.createFromFileFieldElement(e)
Parameters: - name (str) – field name
- type (str) – one of the values from FieldMetaType
- special (str) – one of the values from FieldMetaSpecial
Raises: ValueError – if type or special arg values are invalid
-
static
createFromFileFieldElement
(fieldInfoTuple)¶ Creates a
fieldmeta.FieldMetaInfo
instance from a tuple containingname
,type
, andspecial
.Parameters: fieldInfoTuple – Must contain name
,type
, andspecial
Returns: FieldMetaInfo
instance
-
classmethod
createListFromFileFieldList
(fields)¶ Creates a FieldMetaInfo list from the a list of tuples. Basically runs
createFromFileFieldElement()
on each tuple.Example:
# Create a list of FieldMetaInfo instances from a list of File meta-data # tuples el = [("pounds", FieldMetaType.float, FieldMetaSpecial.none), ("price", FieldMetaType.float, FieldMetaSpecial.none), ("id", FieldMetaType.string, FieldMetaSpecial.sequence), ("date", FieldMetaType.datetime, FieldMetaSpecial.timestamp), ] ml = FieldMetaInfo.createListFromFileFieldList(el)
Parameters: fields – a sequence of field attribute tuples conforming to the format of name
,type
, andspecial
Returns: A list of FieldMetaInfo
elements corresponding to the given ‘fields’ list.
-
class
nupic.data.field_meta.
FieldMetaType
¶ Public values for the field data types. Valid types are:
string
datetime
int
float
bool
list
sdr
-
classmethod
isValid
(fieldDataType)¶ Check a candidate value whether it’s one of the valid field data types
Parameters: fieldDataType – (string) candidate field data type Returns: True if the candidate value is a legitimate field data type value; False if not
-
class
nupic.data.field_meta.
FieldMetaSpecial
¶ Public values for the “special” field attribute. Valid values are:
R
: resetS
: sequenceT
: timestampC
: categoryL
: learning
-
classmethod
isValid
(attr)¶ Check a candidate value whether it’s one of the valid attributes
Parameters: attr – (string) candidate value Returns: True if the candidate value is a legitimate “special” field attribute; False if not
FileRecordStream¶
CSV file based implementation of a record stream
FileRecordStream
is a class that can read and
write .csv
files that contain records. The file has 3 header lines that
contain, for each field, the name (line 1), type (line 2), and a special
indicator (line 3). The special indicator can indicate that the field specifies
a reset, is a sequence ID, or is a timestamp for the record.
You can see an example of a NuPIC data file formatted for
FileRecordStream
in the quick start.
The header lines look like:
f1,f2,f3,....fN
int,string,datetime,bool,...
R,S,T,,,,....
The data lines are just comma separated values that match the types in the second header line. The supported types are: int, float, string, bool, datetime
The format for datetime fields is yyyy-mm-dd hh:mm:ss.us
The ‘us’ component is microseconds.
When reading a file the FileRecordStream will automatically read the header line and will figure out the type of each field and what are the timestamp, reset and sequenceId fields (if any).
FileRecordStream
supports the context manager
(with
statement) protocol. That means you can do:
with FileRecordStream(filename) as f:
...
...
When the control exits the with
block the file will be closed automatically.
You may still call the close()
method at any point (even multiple times).
FileRecordStream
also supports the iteration
protocol so you may read its contents using a for loop:
for r in f:
print r
-
class
nupic.data.file_record_stream.
FileRecordStream
(streamID, write=False, fields=None, missingValues=None, bookmark=None, includeMS=True, firstRecord=None)¶ CSV file based RecordStream implementation
Each field is a 3-tuple (
name
,type
,special
orFieldMetaSpecial
.none)The name is the name of the field. The type is one of the constants in
FieldMetaType
. The special is one of theFieldMetaSpecial
values that designate their field as the sequenceId, reset, timestamp, or category. With exception of multiple categories, there can be at most one of each. There may be multiple fields of type datetime, but no more than one of them may be the timestamp field (FieldMetaSpecial
.timestamp). The sequence id field must be either a string or an int. The reset field must be an int (and must contain 0 or 1).The category field must be an int or space-separated list of ints, where the former represents single-label classification and the latter is for multi-label classification (e.g. “1 3 4” designates a record for labels 1, 3, and 4). The number of categories is allowed to vary record to record; sensor regions represent non-categories with -1, thus the category values must be >= 0.
The FileRecordStream iterates over the field names, types and specials and stores the information.
Parameters: - streamID – CSV file name, input or output
- write – True or False, open for writing if True
- fields – a list of nupic.data.fieldmeta.FieldMetaInfo field descriptors, only applicable when write==True
- missingValues – what missing values should be replaced with?
- bookmark – a reference to the previous reader, if passed in, the records will be returned starting from the point where bookmark was requested. Either bookmark or firstRecord can be specified, not both. If bookmark is used, then firstRecord MUST be None.
- includeMS – If false, the microseconds portion is not included in the generated output file timestamp fields. This makes it compatible with reading in from Excel.
- firstRecord – 0-based index of the first record to start reading from. Either bookmark or firstRecord can be specified, not both. If bookmark is used, then firstRecord MUST be None.
-
appendRecord
(record)¶ Saves the record in the underlying csv file.
Parameters: record – a list of Python objects that will be string-ified
-
appendRecords
(records, progressCB=None)¶ Saves multiple records in the underlying storage.
Parameters: - records – array of records as in
appendRecord()
- progressCB – (function) callback to report progress
- records – array of records as in
-
clearStats
()¶ Resets stats collected so far.
-
close
()¶ Closes the stream.
-
flush
()¶ Flushes the file.
-
getBookmark
()¶ Gets a bookmark or anchor to the current position.
Returns: an anchor to the current position in the data. Passing this anchor to a constructor makes the current position to be the first returned record.
-
getDataRowCount
()¶ Returns: (int) count of data rows in dataset (excluding header lines)
-
getError
()¶ Not implemented. CSV file version does not provide storage for the error information
-
getFieldNames
()¶ Returns: (list) field names associated with the data.
-
getFields
()¶ Returns: a sequence of FieldMetaInfo
name
/type
/special
tuples for each field in the stream.
-
getNextRecord
(useCache=True)¶ Returns next available data record from the file.
Returns: a data row (a list or tuple) if available; None, if no more records in the table (End of Stream - EOS); empty sequence (list or tuple) when timing out while waiting for the next record.
-
getNextRecordIdx
()¶ Returns: (int) the index of the record that will be read next from getNextRecord()
.
-
getStats
()¶ Parse the file using dedicated reader and collect fields stats. Never called if user of
FileRecordStream
does not invokegetStats()
method.Returns: a dictionary of stats. In the current implementation, min and max fields are supported. Example of the return dictionary is: { 'min' : [f1_min, f2_min, None, None, fn_min], 'max' : [f1_max, f2_max, None, None, fn_max] }
(where fx_min/fx_max are set for scalar fields, or None if not)
-
isCompleted
()¶ Not implemented. CSV file is always considered completed.
-
next
()¶ Implement the iterator protocol
-
recordsExistAfter
(bookmark)¶ Returns whether there are more records from current position.
bookmark
is not used in this implementation.Returns: True if there are records left after current position.
-
rewind
()¶ Put us back at the beginning of the file again.
-
seekFromEnd
(numRecords)¶ Seeks to
numRecords
from the end and returns a bookmark to the new position.Parameters: numRecords – how far to seek from end of file. Returns: bookmark to desired location.
-
setAutoRewind
(autoRewind)¶ Controls whether
getNextRecord()
should automatically rewind the source when EOF is reached.Parameters: autoRewind – (bool)
- if True,
getNextRecord()
will automatically rewind the source on EOF. - if False,
getNextRecord()
will not automatically rewind the source on EOF.
- if True,
-
setCompleted
(completed=True)¶ Not implemented: CSV file is always considered completed, nothing to do.
-
setError
(error)¶ Not implemented. CSV file version does not provide storage for the error information
RecordStream¶
Interface for different types of storages (file, hbase, rio, etc).
-
class
nupic.data.record_stream.
RecordStreamIface
¶ This is the interface for the record input/output storage classes.
-
appendRecord
(record)¶ Saves the record in the underlying storage. Should be implemented in subclasses.
Parameters: record – (object) to store
-
appendRecords
(records, progressCB=None)¶ Saves multiple records in the underlying storage. Should be implemented in subclasses.
Parameters: - records – (list) of objects to store
- progressCB – (func) called after each appension
-
clearStats
()¶ Resets stats collected so far.
-
close
()¶ Close the stream
-
flush
()¶ Flush the file to disk
-
getAggregationMonthsAndSeconds
()¶ Returns the aggregation period of the record stream as a dict containing ‘months’ and ‘seconds’. The months is always an integer and seconds is a floating point. Only one is allowed to be non-zero.
If there is no aggregation associated with the stream, returns None.
Typically, a raw file or hbase stream will NOT have any aggregation info, but subclasses of
RecordStreamIface
, likeStreamReader
, will and will return the aggregation period from this call. This call is used bygetNextRecordDict()
to assign a record number to a record given its timestamp and the aggregation interval.Returns: None
-
getBookmark
()¶ Returns an anchor to the current position in the data. Passing this anchor to the constructor makes the current position to be the first returned record. If record is no longer in the storage, the first available after it will be returned.
Returns: anchor to current position in the data.
-
getCategoryFieldIdx
()¶ Returns: (int) index of category
field
-
getError
()¶ Returns: errors saved in the storage.
-
getFieldMax
(fieldName)¶ If underlying implementation does not support min/max stats collection, or if a field type does not support min/max (non scalars), the return value will be None.
Parameters: fieldName – (string) name of field to get max Returns: current maximum value for the field fieldName
.
-
getFieldMin
(fieldName)¶ If underlying implementation does not support min/max stats collection, or if a field type does not support min/max (non scalars), the return value will be None.
Parameters: fieldName – (string) name of field to get min Returns: current minimum value for the field fieldName
.
-
getFieldNames
()¶ Returns: (list) of field names associated with the data.
-
getFields
()¶ Returns: (list) of nupic.data.fieldmeta.FieldMetaInfo
objects for each field in the stream. Might be None, if that information is provided externally (through stream def, for example).
-
getLearningFieldIdx
()¶ Returns: (int) index of the learning
field.
-
getNextRecord
(useCache=True)¶ Returns next available data record from the storage. If
useCache
isFalse
, then don’t read ahead and don’t cache any records.Returns: a data row (a list or tuple) if available; None, if no more records in the table (End of Stream - EOS); empty sequence (list or tuple) when timing out while waiting for the next record.
-
getNextRecordDict
()¶ Returns next available data record from the storage as a dict, with the keys being the field names. This also adds in some meta fields:
_category
: The value from the category field (if any)_reset
: True if the reset field was True (if any)_sequenceId
: the value from the sequenceId field (if any)
-
getNextRecordIdx
()¶ Returns: (int) index of the record that will be read next from getNextRecord()
-
getResetFieldIdx
()¶ Returns: (int) index of the reset
field;None
if no such field.
-
getSequenceIdFieldIdx
()¶ Returns: (int) index of the sequenceId
field.
-
getStats
()¶ Returns: storage stats (like min and max values of the fields).
-
getTimestampFieldIdx
()¶ Returns: (int) index of the timestamp
field.
-
isCompleted
()¶ Returns: True if all records are already in the storage or False if more records is expected.
-
recordsExistAfter
(bookmark)¶ Parameters: bookmark – (int) where to start Returns: True if there are records left after the bookmark.
-
rewind
()¶ Put us back at the beginning of the file again.
-
seekFromEnd
(numRecords)¶ Parameters: numRecords – (int) number of records from the end. Returns: (int) a bookmark numRecords from the end of the stream.
-
setCompleted
(completed)¶ Marks the stream completed.
Parameters: completed – (bool) is completed?
-
setError
(error)¶ Saves specified error in the storage.
Parameters: error – Error to store.
-
setTimeout
(timeout)¶ Set the read timeout in seconds
Parameters: timeout – (int or floating point)
-
StreamReader¶
-
class
nupic.data.stream_reader.
StreamReader
(streamDef, bookmark=None, saveOutput=False, isBlocking=True, maxTimeout=0, eofOnTimeout=False)¶ Implements a stream reader. This is a high level class that owns one or more underlying implementations of a
RecordStreamIface
. EachRecordStreamIface
implements the raw reading of records from the record store (which could be a file, hbase table or something else).In the future, we will support joining of two or more
RecordStreamIface
‘s (which is why thestreamDef
accepts a list of ‘stream’ elements), but for now only 1 source is supported.The class also implements aggregation of the (in the future) joined records from the sources.
This module parses the stream definition (as defined in
/src/nupic/frameworks/opf/jsonschema/stream_def.json
), creates theRecordStreamIface
for each source (‘stream’ element) defined in the stream def, performs aggregation, and returns each record in the correct format according to the desired column names specified in the streamDef.This class implements the
RecordStreamIface
interface and thus can be used in place of a raw record stream.This is an example streamDef:
{ 'version': 1 'info': 'test_hotgym', 'streams': [ {'columns': [u'*'], 'info': u'hotGym.csv', 'last_record': 4000, 'source': u'file://extra/hotgym/hotgym.csv'}. ], 'timeField': 'timestamp', 'aggregation': { 'hours': 1, 'fields': [ ('timestamp', 'first'), ('gym', 'first'), ('consumption', 'sum') ], } }
Parameters: - streamDef – The stream definition, potentially containing multiple
sources (not supported yet). See
src//nupic/frameworks/opf/jsonschema/stream_def.json
for the format of this dict - bookmark – Bookmark to start reading from. This overrides the first_record field of the streamDef if provided.
- saveOutput – If true, save the output to a csv file in a temp directory. The path to the generated file can be found in the log output.
- isBlocking – should read operation block forever if the next row of data is not available, but the stream is not marked as ‘completed’ yet?
- maxTimeout – if isBlocking is False, max seconds to wait for more data before timing out; ignored when isBlocking is True.
- eofOnTimeout – If True and we get a read timeout (isBlocking must be False to get read timeouts), assume we’ve reached the end of the input and produce the last aggregated record, if one can be completed.
-
clearStats
()¶ Resets stats collected so far.
-
close
()¶ Close the stream
-
getAggregationMonthsAndSeconds
()¶ Returns the aggregation period of the record stream as a dict containing ‘months’ and ‘seconds’. The months is always an integer and seconds is a floating point. Only one is allowed to be non-zero at a time.
Will return the aggregation period from this call. This call is used by the
nupic.data.record_stream.RecordStream.getNextRecordDict()
method to assign a record number to a record given its timestamp and the aggregation interval.Returns: aggregationPeriod (as a dict) where: months
: number of months in aggregation periodseconds
: number of seconds in aggregation period (as a float)
-
getBookmark
()¶ Returns: a bookmark to the current position
-
getDataRowCount
()¶ Iterates through stream to calculate total records after aggregation. This will alter the bookmark state.
-
getError
()¶ Returns: errors saved in the stream.
-
getFieldNames
()¶ Returns all fields in all inputs (list of plain names).
Note
currently, only one input is supported
-
getFields
()¶ Returns: a sequence of nupic.data.fieldmeta.FieldMetaInfo
for each field in the stream.
-
getNextRecord
()¶ Returns combined data from all sources (values only).
Returns: None on EOF; empty sequence on timeout.
-
getNextRecordIdx
()¶ Returns: the index of the record that will be read next from getNextRecord()
.
-
getStats
()¶ TODO: This method needs to be enhanced to get the stats on the aggregated records.
Returns: stats (like min and max values of the fields).
-
isCompleted
()¶ Returns: True if all records have been read.
-
recordsExistAfter
(bookmark)¶ Returns: True if there are records left after the bookmark.
-
setCompleted
(completed=True)¶ Marks the stream completed (True or False)
Parameters: completed – (bool) is completed or not
-
setError
(error)¶ Saves specified error in the stream.
Parameters: error – to save
-
setTimeout
(timeout)¶ Set the read timeout.
Parameters: timeout – (float or int) timeout length
- streamDef – The stream definition, potentially containing multiple
sources (not supported yet). See
Utilities¶
Collection of utilities to process input data
-
nupic.data.utils.
DATETIME_FORMATS
= ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S:%f', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M', '%Y-%m-%d', '%m/%d/%Y %H:%M', '%m/%d/%y %H:%M', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S')¶ These are the supported timestamp formats to parse. The first is the format used by NuPIC when serializing datetimes.
-
nupic.data.utils.
escape
(s)¶ Escape commas, tabs, newlines and dashes in a string
Commas are encoded as tabs.
Parameters: s – (string) to escape Returns: (string) escaped string
-
nupic.data.utils.
floatOrNone
(f)¶ Tries to convert input to a float input or returns
None
.Parameters: f – (object) thing to convert to a float Returns: (float or None
)
-
nupic.data.utils.
intOrNone
(i)¶ Tries to convert input to a int input or returns
None
.Parameters: f – (object) thing to convert to a int Returns: (int or None
)
-
nupic.data.utils.
parseBool
(s)¶ String to boolean
Parameters: s – (string) Returns: (bool)
-
nupic.data.utils.
parseSdr
(s)¶ Parses a string containing only 0’s and 1’s and return a Python list object.
Parameters: s – (string) string to parse Returns: (list) SDR out
-
nupic.data.utils.
parseStringList
(s)¶ Parse a string of space-separated numbers, returning a Python list.
Parameters: s – (string) to parse Returns: (list) binary SDR
-
nupic.data.utils.
parseTimestamp
(s)¶ Parses a textual datetime format and return a Python datetime object.
The supported format is:
yyyy-mm-dd h:m:s.ms
The time component is optional.
- hours are 00..23 (no AM/PM)
- minutes are 00..59
- seconds are 00..59
- micro-seconds are 000000..999999
Parameters: s – (string) input time text Returns: (datetime.datetime)
-
nupic.data.utils.
serializeSdr
(sdr)¶ Serialize Python list object containing only 0’s and 1’s to string.
Parameters: sdr – (list) binary Returns: (string) SDR out
-
nupic.data.utils.
serializeTimestamp
(t)¶ Turns a datetime object into a string.
Parameters: t – (datetime.datetime) Returns: (string) in default format (see :const:`DATETIME_FORMATS`[0])
-
nupic.data.utils.
serializeTimestampNoMS
(t)¶ Turns a datetime object into a string ignoring milliseconds.
Parameters: t – (datetime.datetime) Returns: (string) in default format (see :const:`DATETIME_FORMATS`[2])
-
nupic.data.utils.
stripList
(listObj)¶ Convert a list of numbers to a string of space-separated values.
Parameters: listObj – (list) to convert Returns: (string) of space-separated values
-
nupic.data.utils.
unescape
(s)¶ Unescapes a string that may contain commas, tabs, newlines and dashes
Commas are decoded from tabs.
Parameters: s – (string) to unescape Returns: (string) unescaped string
-
nupic.data.utils.
DATETIME_FORMATS
These are the supported timestamp formats to parse. The first is the format used by NuPIC when serializing datetimes.