| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- ## @package dataset
- # Module caffe2.python.dataset
- """
- Implementation of an in-memory dataset with structured schema.
- Use this to store and iterate through datasets with complex schema that
- fit in memory.
- Iterating through entries of this dataset is very fast since the dataset
- is stored as a set of native Caffe2 tensors, thus no type conversion or
- deserialization is necessary.
- """
- from caffe2.python import core, workspace
- from caffe2.python.dataio import Reader, Writer
- from caffe2.python.schema import (
- Struct, from_blob_list, from_column_list, InitEmptyRecord)
- import numpy as np
- class _DatasetReader(Reader):
- def __init__(self, dataset, name, batch_size=1, enforce_batch_size=False):
- """Don't call this directly. Instead, use dataset.reader()"""
- Reader.__init__(self, dataset.content())
- self.dataset = dataset
- self.name = name or (dataset.name + '_cursor')
- self.batch_size = batch_size
- self.enforce_batch_size = enforce_batch_size
- self.cursor = None
- def setup_ex(self, init_net, exit_net):
- if self.cursor is None:
- self.cursor = init_net.CreateTreeCursor(
- [],
- init_net.NextScopedBlob(self.name),
- fields=self.dataset.fields)
- def read(self, read_net):
- assert self.cursor, 'setup not called.'
- content = self.dataset.content()
- with core.NameScope(read_net.NextName(self.name)):
- fields = read_net.ReadNextBatch(
- [self.cursor] + content.field_blobs(),
- content.field_names(),
- batch_size=self.batch_size,
- enforce_batch_size=self.enforce_batch_size)
- fields = core.output_to_list(fields)
- return (read_net.IsEmpty([fields[0]]), fields)
- def reset(self, net):
- net.ResetCursor([self.cursor], [])
- class _DatasetRandomReader(Reader):
- def __init__(self, dataset, name, indices, batch_size=1, loop_over=False,
- enforce_batch_size=False):
- """Don't call this directly. Instead, use dataset.random_reader()"""
- Reader.__init__(self, dataset.content())
- self.dataset = dataset
- self.cursor = None
- self.name = name or (dataset.name + '_cursor')
- self.indices = indices
- self.batch_size = batch_size
- self.loop_over = loop_over
- self.enforce_batch_size = enforce_batch_size
- def setup_ex(self, init_net, exit_net):
- if self.cursor is None:
- self.cursor = init_net.CreateTreeCursor(
- [],
- init_net.NextScopedBlob(self.name),
- fields=self.dataset.fields)
- def reset(self, net):
- net.ResetCursor([self.cursor], [])
- def computeoffset(self, net):
- self.reset(net)
- offsets = net.ComputeOffset(
- [self.cursor] + self.dataset.content().field_blobs(),
- 'offsets')
- self.offsets = offsets
- def sort_and_shuffle(self, net, sort_by_field=None,
- shuffle_size=1, batch_size=1):
- # no sorting by default
- content = self.dataset.content()
- sort_by_field_idx = -1
- if sort_by_field:
- assert sort_by_field in content.field_names(), (
- 'Must be valid field.')
- sort_by_field_idx = content.field_names().index(sort_by_field)
- self.reset(net)
- indices = net.SortAndShuffle(
- [self.cursor] + content.field_blobs(),
- 'indices',
- sort_by_field_idx=sort_by_field_idx,
- shuffle_size=shuffle_size,
- batch_size=batch_size)
- self.indices = indices
- def read(self, read_net):
- assert self.cursor, 'setup_ex not called'
- assert self.indices, 'sort_and_shuffle not called'
- assert self.offsets, 'computeoffset not called'
- content = self.dataset.content()
- with core.NameScope(read_net.NextName(self.name)):
- fields = read_net.ReadRandomBatch(
- [self.cursor, self.indices, self.offsets] + (
- content.field_blobs()),
- content.field_names(),
- batch_size=self.batch_size,
- enforce_batch_size=self.enforce_batch_size,
- loop_over=self.loop_over)
- fields = core.output_to_list(fields)
- return (read_net.IsEmpty([fields[0]]), fields)
- class _DatasetWriter(Writer):
- def __init__(self, content):
- """Don't call this directly. Use dataset.writer() instead."""
- self._content = content
- self.mutex = None
- def setup_ex(self, init_net, exit_net):
- if self.mutex is None:
- self.mutex = init_net.CreateMutex([])
- def write(self, writer_net, fields):
- """
- Add operations to `net` that append the blobs in `fields` to the end
- of the dataset. An additional operator will also be added that checks
- the consistency of the data in `fields` against the dataset schema.
- Args:
- writer_net: The net that will contain the Append operators.
- fields: A list of BlobReference to be appeneded to this dataset.
- """
- assert self.mutex is not None, 'setup not called.'
- field_blobs = self._content.field_blobs()
- assert len(fields) == len(field_blobs), (
- 'Expected %s fields, got %s.' % (len(field_blobs), len(fields)))
- writer_net.CheckDatasetConsistency(
- fields, [], fields=self._content.field_names())
- writer_net.AtomicAppend(
- [self.mutex] + field_blobs + list(fields),
- field_blobs)
- def commit(self, finish_net):
- """Commit is a no-op for an in-memory dataset."""
- pass
- def Const(net, value, dtype=None, name=None):
- """
- Create a 'constant' by first creating an external input in the given
- net, and then feeding the corresponding blob with its provided value
- in the current workspace. The name is automatically generated in order
- to avoid clashes with existing blob names.
- """
- assert isinstance(net, core.Net), 'net must be a core.Net instance.'
- value = np.array(value, dtype=dtype)
- blob = net.AddExternalInput(net.NextName(prefix=name))
- workspace.FeedBlob(str(blob), value)
- return blob
- def execution_step_with_progress(name, init_net, substeps, rows_read):
- # progress reporter
- report_net = core.Net('report_net')
- report_net.Print([rows_read], [])
- return core.execution_step(
- name,
- substeps,
- report_net=report_net,
- concurrent_substeps=True,
- report_interval=5)
- class Dataset(object):
- """Represents an in-memory dataset with fixed schema.
- Use this to store and iterate through datasets with complex schema that
- fit in memory.
- Iterating through entries of this dataset is very fast since the dataset
- is stored as a set of native Caffe2 tensors, thus no type conversion or
- deserialization is necessary.
- """
- def __init__(self, fields, name=None):
- """Create an un-initialized dataset with schema provided by `fields`.
- Before this dataset can be used, it must be initialized, either by
- `init_empty` or `init_from_dataframe`.
- Args:
- fields: either a schema.Struct or a list of field names in a format
- compatible with the one described in schema.py.
- name: optional name to prepend to blobs that will store the data.
- """
- assert isinstance(fields, list) or isinstance(fields, Struct), (
- 'fields must be either a Struct or a list of raw field names.')
- if isinstance(fields, list):
- fields = from_column_list(fields)
- self.schema = fields
- self.fields = fields.field_names()
- self.field_types = fields.field_types()
- self.name = name or 'dataset'
- self.field_blobs = fields.field_blobs() if fields.has_blobs() else None
- def trim(self, net, multiple_of):
- """
- Trims the contents of this dataset so that the number of records is
- multiple of the given argument.
- """
- net.TrimDataset(
- self.field_blobs,
- self.field_blobs,
- fields=self.fields,
- multiple_of=multiple_of)
- def init_empty(self, init_net):
- """Initialize the blobs for this dataset with empty values.
- Empty arrays will be immediately fed into the current workspace,
- and `init_net` will take those blobs as external inputs.
- """
- self.field_blobs = InitEmptyRecord(
- init_net, self.schema.clone_schema()).field_blobs()
- def init_from_dataframe(self, net, dataframe):
- """Initialize the blobs for this dataset from a Pandas dataframe.
- Each column of the dataframe will be immediately fed into the current
- workspace, and the `net` will take this blobs as external inputs.
- """
- assert len(self.fields) == len(dataframe.columns)
- self.field_blobs = [
- Const(net, dataframe.as_matrix([col]).flatten(), name=field)
- for col, field in enumerate(self.fields)]
- def get_blobs(self):
- """
- Return the list of BlobReference pointing to the blobs that contain
- the data for this dataset.
- """
- assert self
- return self.field_blobs
- def content(self):
- """
- Return a Record of BlobReferences pointing to the full content of
- this dataset.
- """
- return from_blob_list(self.schema, self.field_blobs)
- def field_names(self):
- """Return the list of field names for this dataset."""
- return self.fields
- def field_types(self):
- """
- Return the list of field dtypes for this dataset.
- If a list of strings, not a schema.Struct, was passed to the
- constructor, this will return a list of dtype(np.void).
- """
- return self.field_types
- def reader(self, init_net=None, cursor_name=None, batch_size=1,
- enforce_batch_size=False):
- """Create a Reader object that is used to iterate through the dataset.
- This will append operations to `init_net` that create a TreeCursor,
- used to iterate through the data.
- NOTE: Currently, it is not safe to append to a dataset while reading.
- Args:
- init_net: net that will be run once to create the cursor.
- cursor_name: optional name for the blob containing a pointer
- to the cursor.
- batch_size: how many samples to read per iteration.
- Returns:
- A _DatasetReader that can be used to create operators that will
- iterate through the dataset.
- """
- assert self.field_blobs, 'Dataset not initialized.'
- reader = _DatasetReader(self, cursor_name, batch_size,
- enforce_batch_size)
- if init_net is not None:
- reader.setup_ex(init_net, None)
- return reader
- def random_reader(self, init_net=None, indices=None, cursor_name=None,
- batch_size=1, loop_over=False, enforce_batch_size=False):
- """Create a Reader object that is used to iterate through the dataset.
- NOTE: The reader order depends on the order in indices.
- Args:
- init_net: net that will be run once to create the cursor.
- indices: blob of reading order
- cursor_name: optional name for the blob containing a pointer
- to the cursor.
- batch_size: how many samples to read per iteration.
- loop_over: repeat the dataset indefinitely (in the same order)
- Returns:
- A DatasetReader that can be used to create operators that will
- iterate through the dataset according to indices.
- """
- assert self.field_blobs, 'Dataset not initialized.'
- reader = _DatasetRandomReader(
- self, cursor_name, indices, batch_size, loop_over,
- enforce_batch_size)
- if init_net is not None:
- reader.setup_ex(init_net, None)
- return reader
- def writer(self, init_net=None):
- """Create a Writer that can be used to append entries into the dataset.
- NOTE: Currently, it is not safe to append to a dataset
- while reading from it.
- NOTE: Currently implementation of writer is not thread safe.
- TODO: fixme
- Args:
- init_net: net that will be run once in order to create the writer.
- (currently not used)
- """
- assert self.field_blobs, 'Dataset not initialized.'
- writer = _DatasetWriter(self.content())
- if init_net is not None:
- writer.setup_ex(init_net, None)
- return writer
|