| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- ## @package pipeline
- # Module caffe2.python.pipeline
- from caffe2.python import core, queue_util
- from caffe2.python.dataio import Reader, Writer
- from caffe2.python.net_builder import NetBuilder, ops
- from caffe2.python.schema import as_record, Field
- from caffe2.python.task import Node, Task, TaskGroup
- class Output(object):
- """
- Represents the result of a processor function. A processor can either
- return an Output, or it can return a record, in which case an Output will be
- created for it afterwards.
- """
- def __init__(self, nets=None, record=None, should_stop=None):
- builder_children = NetBuilder.current().get()
- assert nets is None or len(builder_children) == 0, (
- 'Cannot both use `ops` syntax and return a list of nets.')
- if nets is None:
- nets = builder_children
- if isinstance(nets, core.Net):
- nets = [nets]
- self.nets = [] if nets is None else list(nets)
- self.record = None if record is None else as_record(record)
- self.should_stop = should_stop
- DEFAULT_QUEUE_CAPACITY = 100
- def _init_output(output, capacity, global_init_net, global_exit_net):
- if output is None:
- out_queue = queue_util.Queue(
- capacity=(
- capacity if capacity is not None
- else DEFAULT_QUEUE_CAPACITY))
- writer = out_queue.writer()
- elif isinstance(output, Writer):
- assert capacity is None, 'capacity would not be used.'
- out_queue = None
- writer = output
- elif hasattr(output, 'writer'):
- assert capacity is None, 'capacity would not be used.'
- out_queue = output
- writer = output.writer()
- else:
- raise ValueError('output must be a reader, queue or stream.')
- writer.setup_ex(global_init_net, global_exit_net)
- return out_queue, writer
- def make_processor(processor, reader=None):
- if processor is None:
- return lambda rec: rec
- elif isinstance(processor, core.Net):
- return NetProcessor(processor)
- else:
- if reader is not None and hasattr(processor, "schema_func"):
- def processor_schema():
- return processor.schema_func(reader)
- processor.schema = processor_schema
- return processor
- def normalize_processor_output(output):
- """
- Allow for processors to return results in several formats.
- TODO(azzolini): simplify once all processors use NetBuilder API.
- """
- if isinstance(output, Output):
- """ Processor returned an Output. """
- return output
- elif isinstance(output, Field):
- """ Processor returned a record. """
- return Output(record=output)
- elif isinstance(output, tuple):
- is_record_and_blob = (
- len(output) == 2 and
- isinstance(output[0], Field) and
- isinstance(output[1], core.BlobReference))
- if is_record_and_blob:
- """ Processor returned (record, stop_blob) """
- return Output(None, *output)
- else:
- """ Processor returned (nets, record, stop_blob) """
- return Output(*output)
- else:
- """ Processor returned nets, no output """
- return Output(output)
- def pipe(
- input, output=None, num_threads=1, processor=None, name=None,
- capacity=None, group=None, num_runtime_threads=1):
- """
- Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
- Queue or DataStream in `output`, creates a Task that, when run, will
- pipe the input into the output, using multiple parallel threads.
- Additionally, if a processor is given, it will be called between reading
- and writing steps, allowing it to transform the record.
- Args:
- input: either a Reader, Queue or DataStream that will be read
- until a stop is signaled either by the reader or the
- writer.
- output: either a Writer, a Queue or a DataStream that will be
- written to as long as neither reader nor writer signal
- a stop condition. If output is not provided or is None,
- a Queue is created with given `capacity` and written to.
- num_threads: number of concurrent threads used for processing and
- piping. If set to 0, no Task is created, and a
- reader is returned instead -- the reader returned will
- read from the reader passed in and process it.
- ** DEPRECATED **. Use `num_runtime_threads` instead.
- This option will be removed once all readers/processors
- support `num_runtime_threads`.
- processor: (optional) function that takes an input record and
- optionally returns a record; this will be called
- between read and write steps. If the processor does
- not return a record, a writer will not be instantiated.
- Processor can also be a core.Net with input and output
- records properly set. In that case, a NetProcessor is
- instantiated, cloning the net for each of the threads.
- name: (optional) name of the task to be created.
- capacity: when output is not passed, a queue of given `capacity`
- is created and written to.
- group: (optional) explicitly add the created Task to this
- TaskGroup, instead of using the currently active one.
- num_runtime_threads: Similar to `num_threads`, but instead of expanding
- the tasks with a `for` loop in python, does that at
- runtime. This is preferable to `num_threads`, but some
- processors/readers still require to be called multiple
- times in python.
- Returns:
- Output Queue, DataStream, Reader, or None, depending on the parameters
- passed.
- """
- result, _ = _pipe_step(
- input, output, num_threads, processor, name, capacity, group,
- num_runtime_threads)
- return result
- def pipe_and_output(
- input, output=None, num_threads=1, processor=None, name=None,
- capacity=None, group=None, num_runtime_threads=1, final_outputs=None):
- """
- Similar to `pipe`, with the additional ability for the pipe Task to
- return output values to the `Session` once done.
- Returns:
- Tuple (out_queue, *task_outputs)
- out_queue: same as return value of `pipe`.
- task_outputs: TaskOutput object, fetchable from the client after
- session.run() returns.
- """
- assert num_threads > 0
- result, task = _pipe_step(
- input, output, num_threads, processor, name, capacity, group,
- num_runtime_threads, final_outputs)
- output = None
- if final_outputs is not None:
- output = task.outputs()
- if type(final_outputs) not in (list, tuple):
- output = output[0]
- return result, output
- def processor_name(processor):
- if hasattr(processor, 'name'):
- return processor.name
- if hasattr(processor, 'func_name'):
- if processor.func_name == '<lambda>':
- return processor.__module__
- if hasattr(processor, 'im_class'):
- return '%s.%s' % (processor.im_class.__name__, processor.func_name)
- return processor.func_name
- return processor.__class__.__name__
- def _runtime_threads_task(name, group, final_outputs, reader, num_threads,
- output, capacity):
- node_name = str(Node.current())
- profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
- node_name,
- "pipe",
- name,
- processor_name(input) if input else "NoInput",
- processor_name(output) if output else "NoOutput")
- with Task(name=name, group=group, outputs=final_outputs,
- num_instances=num_threads) as task:
- global_exit_net = core.Net('pipe:exit')
- global_init_net = core.Net('pipe:init')
- reader.setup_ex(global_init_net, global_exit_net)
- init_net = core.Net('pipe:instance:init')
- exit_net = core.Net('pipe:instance:exit')
- read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
- init_net.ConstantFill(
- [], [status],
- shape=[],
- value=False,
- dtype=core.DataType.BOOL
- )
- if rec is not None:
- out_queue, writer = _init_output(
- output, capacity, global_init_net, global_exit_net)
- write_nets, _ = writer.write_record_ex(
- rec, init_net, exit_net, status)
- else:
- out_queue = None
- write_nets = []
- with ops.task_init():
- ops.net(global_init_net)
- with ops.task_instance_init():
- ops.net(init_net)
- timer_start_net = core.Net('timer_start')
- timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
- timer_end_net = core.Net('timer_end')
- timer_end_net.TimerEnd(timer, [])
- ops.net(core.execution_step(
- 'body',
- [timer_start_net] + list(read_nets) + list(write_nets) +
- [timer_end_net],
- should_stop_blob=status))
- ops.net(timer_end_net)
- with ops.task_instance_exit():
- ops.net(exit_net)
- with ops.task_exit():
- ops.net(global_exit_net)
- return out_queue, task
- def _static_threads_task(name, group, final_outputs, reader, num_threads,
- output, capacity):
- node_name = str(Node.current())
- profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
- node_name,
- "pipe",
- name,
- processor_name(input) if input else "NoInput",
- processor_name(output) if output else "NoOutput")
- with Task(name=name, group=group, outputs=final_outputs) as task:
- global_exit_net = core.Net('exit')
- global_init_net = core.Net('init')
- reader.setup_ex(global_init_net, global_exit_net)
- out_queue = None
- writer = None
- steps = []
- for thread_id in range(num_threads):
- with NetBuilder(name='t:%d' % thread_id) as nb:
- init_net = core.Net('init')
- exit_net = core.Net('exit')
- read_nets, status, rec = reader.read_record_ex(
- init_net, exit_net)
- init_net.ConstantFill(
- [], [status],
- shape=[],
- value=False,
- dtype=core.DataType.BOOL
- )
- if rec is not None:
- if writer is None:
- # hack so that the out queue gets the right name prefix
- # (otherwise they would be prefixed with the thread id)
- with NetBuilder(_fullname=task.name):
- out_queue, writer = _init_output(
- output, capacity, global_init_net,
- global_exit_net)
- write_nets, _ = writer.write_record_ex(
- rec, init_net, exit_net, status)
- else:
- write_nets = []
- timer_start_net = core.Net('timer_start')
- timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
- timer_end_net = core.Net('timer_end')
- timer_end_net.TimerEnd(timer, [])
- ops.net(init_net)
- ops.net(core.execution_step(
- 'body',
- [timer_start_net] + list(read_nets) + list(write_nets) +
- [timer_end_net],
- should_stop_blob=status))
- ops.net(timer_end_net)
- ops.net(exit_net)
- steps.append(core.to_execution_step(nb))
- ops.net(global_init_net)
- ops.net(core.execution_step('body', steps, concurrent_substeps=True))
- ops.net(global_exit_net)
- return out_queue, task
- def _pipe_step(
- input, output=None, num_threads=1, processor=None, name=None,
- capacity=None, group=None, num_runtime_threads=None, final_outputs=None):
- """
- """
- assert num_threads <= 1 or num_runtime_threads <= 1, (
- 'Only one of num_threads or num_runtime_threads must be set.')
- if isinstance(input, Reader):
- reader = input
- elif hasattr(input, 'reader'):
- reader = input.reader()
- else:
- raise ValueError(
- 'Input must be a reader, queue or stream. Got {}'.format(type(input)))
- if processor is not None:
- reader = ProcessingReader(reader, processor)
- if num_threads == 0 or num_runtime_threads == 0:
- assert output is None
- return reader, None
- if name is None and processor is not None:
- name = processor_name(processor)
- if name is None and output is not None:
- name = 'pipe_into:%s' % processor_name(output)
- if name is None:
- name = 'pipe_from:%s' % processor_name(input)
- if num_threads > 1:
- return _static_threads_task(
- name, group, final_outputs, reader, num_threads, output, capacity)
- else:
- return _runtime_threads_task(
- name, group, final_outputs, reader, num_runtime_threads, output,
- capacity)
- class ProcessingReader(Reader):
- """
- Reader that reads from an upstream reader, calls the processor, and returns
- the processed record.
- """
- def __init__(self, reader, processor):
- Reader.__init__(self)
- self.reader = reader
- self.processor = make_processor(processor, reader)
- def schema(self):
- return self.processor.schema()
- def setup_ex(self, init_net, finish_net):
- self.reader.setup_ex(init_net, finish_net)
- def read_ex(self, init_net, exit_net):
- read_nets, status, rec = self.reader.read_record_ex(init_net, exit_net)
- # We don't use status as stop_blob of NetBuilder it's not guarantee that
- # it would end up being the true stob_blob. For example,
- # ReaderWithLimitBase doesn't pass the status through but rather copy
- # from it.
- with NetBuilder() as nb:
- # Current NetBuilder is optionally used inside the processor,
- # then its children are retrieved inside of
- # normalize_processor_output.
- # Once readers and writers also use NetBuilder,
- # this logic will be more natural.
- result = normalize_processor_output(self.processor(rec))
- read_nets += result.nets
- if result.should_stop or nb._stop_blob:
- stop_net = core.Net('stop_net')
- if result.should_stop:
- stop_net.Or([status, result.should_stop], [status])
- if nb._stop_blob:
- stop_net.Or([status, nb._stop_blob], [status])
- read_nets.append(stop_net)
- if hasattr(self.processor, 'setup'):
- init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.processor)
- self._set_schema(result.record)
- fields = result.record.field_blobs() if result.record else None
- return read_nets, status, fields
- class NetProcessor(object):
- """
- Processor that clones a core.Net each time it's called, executing
- the cloned net as the processor. It requires the Net to have input
- and (optionally) output records set, with net.set_input_record() and
- net.set_output_record().
- """
- def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
- assert isinstance(net, core.Net)
- assert stop_signal is None or isinstance(
- stop_signal, core.BlobReference)
- self.name = name or str(net)
- self.thread_init_nets = thread_init_nets or []
- self.net = net
- self._stop_signal = stop_signal
- self._blob_maps = []
- self._frozen = False
- self._cloned_init_nets = []
- def schema(self):
- return self.net.output_record()
- def setup(self, init_net):
- self._frozen = True
- cloned_init_nets = self._cloned_init_nets
- self._cloned_init_nets = []
- return cloned_init_nets
- def __call__(self, rec):
- assert not self._frozen
- prefix = NetBuilder.current().name + '/'
- blob_remap = {}
- for net in self.thread_init_nets:
- new_net, _ = core.clone_and_bind_net(
- net, str(net) + prefix, prefix, blob_remap)
- self._cloned_init_nets.append(new_net)
- new_net, remappings = core.clone_and_bind_net(
- self.net, str(self.net) + prefix, prefix, blob_remap, rec)
- if self._stop_signal is None:
- stop_signal = None
- elif str(self._stop_signal) in remappings:
- stop_signal = core.BlobReference(
- remappings[str(self._stop_signal)],
- net=new_net)
- else:
- stop_signal = self._stop_signal
- self._blob_maps.append(remappings)
- return Output([new_net], new_net.output_record(), stop_signal)
- def blob_maps(self):
- self._frozen = True
- return self._blob_maps
|