layers.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. ## @package layers
  2. # Module caffe2.python.layers.layers
  3. import logging
  4. from collections import namedtuple
  5. import numpy as np
  6. from caffe2.proto import caffe2_pb2
  7. from caffe2.python import core, schema, scope, utils, workspace
  8. from caffe2.python.layers.tags import TagContext
  9. logger = logging.getLogger(__name__)
  10. logger.setLevel(logging.INFO)
  11. # Some types to simplify descriptions of things traveling between ops
  12. IdList = schema.List(np.int64)
  13. IdScoreList = schema.Map(np.int64, np.float32)
  14. IdListWithEvicted = schema.ListWithEvicted(np.int64)
  15. IdScoreListWithEvicted = schema.MapWithEvicted(np.int64, np.float32)
  16. def almost_equal_schemas(
  17. record,
  18. original_schema,
  19. check_field_names=True,
  20. check_field_types=True,
  21. check_field_metas=False,
  22. ):
  23. if original_schema == IdList:
  24. return schema.equal_schemas(
  25. record,
  26. IdList,
  27. check_field_names=check_field_names,
  28. check_field_types=check_field_types,
  29. check_field_metas=check_field_metas,
  30. ) or schema.equal_schemas(
  31. record,
  32. IdListWithEvicted,
  33. check_field_names=check_field_names,
  34. check_field_types=check_field_types,
  35. check_field_metas=check_field_metas,
  36. )
  37. elif original_schema == IdScoreList:
  38. return schema.equal_schemas(
  39. record,
  40. IdScoreList,
  41. check_field_names=check_field_names,
  42. check_field_types=check_field_types,
  43. check_field_metas=check_field_metas,
  44. ) or schema.equal_schemas(
  45. record,
  46. IdScoreListWithEvicted,
  47. check_field_names=check_field_names,
  48. check_field_types=check_field_types,
  49. check_field_metas=check_field_metas,
  50. )
  51. else:
  52. return schema.equal_schemas(record, original_schema)
  53. def get_key(record):
  54. if almost_equal_schemas(record, IdList):
  55. key = "values"
  56. elif almost_equal_schemas(
  57. record, IdScoreList, check_field_types=False
  58. ):
  59. key = "values:keys"
  60. else:
  61. raise NotImplementedError("Not implemented for {}".format(record))
  62. assert record[key].metadata is not None, "Blob {} doesn't have metadata".format(
  63. str(record[key]())
  64. )
  65. return record[key]
  66. def get_categorical_limit(record):
  67. key = get_key(record)
  68. return key.metadata.categorical_limit
  69. def get_avg_length(record):
  70. return record["lengths"].metadata.expected_value
  71. def set_request_only(field):
  72. for f in field.all_scalars():
  73. categorical_limit, expected_value = None, None
  74. if not f.metadata:
  75. feature_specs = schema.FeatureSpec(feature_is_request_only=True)
  76. elif not f.metadata.feature_specs:
  77. categorical_limit = f.metadata.categorical_limit
  78. expected_value = f.metadata.expected_value
  79. feature_specs = schema.FeatureSpec(feature_is_request_only=True)
  80. else:
  81. categorical_limit = f.metadata.categorical_limit
  82. expected_value = f.metadata.expected_value
  83. feature_specs = schema.FeatureSpec(
  84. feature_type=f.metadata.feature_specs.feature_type,
  85. feature_names=f.metadata.feature_specs.feature_names,
  86. feature_ids=f.metadata.feature_specs.feature_ids,
  87. feature_is_request_only=True,
  88. desired_hash_size=f.metadata.feature_specs.desired_hash_size,
  89. )
  90. # make sure not to set categorical_limit for a non-integer field
  91. if not np.issubdtype(f.field_type(), np.integer):
  92. assert (
  93. categorical_limit is None
  94. ), "categorical_limit shouldn't be set for no-integer field"
  95. f.set_metadata(
  96. schema.Metadata(
  97. categorical_limit=categorical_limit,
  98. expected_value=expected_value,
  99. feature_specs=feature_specs,
  100. )
  101. )
  102. class InstantiationContext(object):
  103. """
  104. List of contexts where layer could be instantitated
  105. """
  106. # The layers support this context will accumulate predictions, labels,
  107. # weights. The accumulated data can later be used to compute
  108. # calibration or for other
  109. # purpose.
  110. ACCUMULATE_PRED = "accumulate_pred"
  111. EVAL = "eval"
  112. PREDICTION = "prediction"
  113. TRAINING = "training"
  114. _LAYER_REGISTRY = {}
  115. def register_layer(name, layer):
  116. assert name not in _LAYER_REGISTRY, "{0} already exists".format(name)
  117. _LAYER_REGISTRY[name] = layer
  118. def layer_exists(name):
  119. return name in _LAYER_REGISTRY
  120. def get_layer_class(name):
  121. return _LAYER_REGISTRY[name]
  122. def create_layer(layer_name, *args, **kwargs):
  123. return _LAYER_REGISTRY[layer_name](*args, **kwargs)
  124. LayerPsParam = namedtuple("LayerPsParam", ["sparse_key", "average_length"])
  125. class LayerParameter(object):
  126. def __init__(
  127. self,
  128. parameter=None,
  129. optimizer=None,
  130. initializer=None,
  131. ps_param=None,
  132. regularizer=None,
  133. ):
  134. assert isinstance(
  135. parameter, core.BlobReference
  136. ), "expect {0} to be a blob reference".format(str(parameter))
  137. # need to put the following line (shape) before initialier
  138. # shape will be updated once initializer is (re)set
  139. self._shape = None
  140. self.parameter = parameter
  141. self.optimizer = optimizer
  142. self.initializer = initializer
  143. self.ps_param = ps_param
  144. self.regularizer = regularizer
  145. @property
  146. def initializer(self):
  147. return self._initializer
  148. @initializer.setter
  149. def initializer(self, op):
  150. assert op is None or core.IsOperator(
  151. getattr(op, "type", None)
  152. ), "initializer expects an operator, got type: {}".format(type(op))
  153. self._initializer = op
  154. if op is not None:
  155. self.shape = self._infer_shape_from_initializer()
  156. @property
  157. def shape(self):
  158. return self._shape
  159. @shape.setter
  160. def shape(self, shape):
  161. assert self.shape is None or self.shape == shape, (
  162. "inconsistent shape for layer parameter:"
  163. " {}, expect: {}, but got {}".format(self, self.shape, shape)
  164. )
  165. self._shape = shape
  166. def _infer_shape_from_initializer(self):
  167. for arg in self.initializer.arg:
  168. if arg.name == "shape":
  169. return list(arg.ints)
  170. with workspace.WorkspaceGuard("model_init_by_loading_params"):
  171. try:
  172. net = core.Net("shape_checker")
  173. net._net.op.extend([self.initializer])
  174. shape_blob = net.NextScopedBlob(self.parameter + "_shape")
  175. net.Shape([self.parameter], shape_blob)
  176. workspace.RunNetOnce(net)
  177. shape = workspace.FetchBlob(shape_blob).tolist()
  178. # ResetWorkspace to save memory
  179. workspace.ResetWorkspace()
  180. return shape
  181. except RuntimeError as exp:
  182. logger.warning(
  183. "Cannot infer the shape of blob {} from operator {}: {}".format(
  184. self.parameter, self.initializer.type, exp
  185. )
  186. )
  187. workspace.ResetWorkspace()
  188. return None
  189. def __str__(self):
  190. return str(self.parameter)
  191. def is_request_only_scalar(scalar):
  192. if len(scalar.field_metadata()) == 0:
  193. return False
  194. for metadata in scalar.field_metadata():
  195. if not (
  196. metadata
  197. and metadata.feature_specs
  198. and getattr(metadata.feature_specs, "feature_is_request_only", False)
  199. ):
  200. return False
  201. return True
  202. # Contains features accessed in a model layer of a given type
  203. # `type`: A string representing the kind of feature, consistent with FeatureSpec
  204. # `ids`: A set of feature IDs that are accessed in the model layer
  205. AccessedFeatures = namedtuple("AccessedFeatures", ["type", "ids"])
  206. class ModelLayer(object):
  207. def __init__(
  208. self,
  209. model,
  210. prefix,
  211. input_record,
  212. predict_input_record_fields=None,
  213. tags=None,
  214. **kwargs
  215. ):
  216. """
  217. Base class for model layers. Layer is an abstraction that allows to
  218. provide model description in terms of meta-operators, where each of the
  219. meta-operators can have different implementations for training,
  220. evaluation and prediction, that are instantiated later. As an example
  221. SampledSoftmax can do something related to sampling depending on
  222. supervision during the training and just apply softmax if it's used for
  223. prediction/evaluation.
  224. All inputs/outputs from layers are represented as a record (instance of
  225. schema bounded to blobs) and are accessible through input_record and
  226. output_schema. If Layer needs to have only a subset of inputs/provides
  227. subset of outputs during the inference - it should provide
  228. predict_input_record and predict_output_schema correspondingly (those
  229. records are expected to be a subset of input_record/output_schema).
  230. Each layer has a list of Tags associated with it, that depends on
  231. current context and arguments. It's possible to use those tags during
  232. the instantiation time.
  233. """
  234. self.name = model.next_layer_name(prefix)
  235. self.model = model
  236. self.kwargs = kwargs
  237. self._input_record = input_record
  238. if predict_input_record_fields:
  239. if not isinstance(predict_input_record_fields, list):
  240. predict_input_record_fields = [predict_input_record_fields]
  241. self._predict_input_record = self._input_record[predict_input_record_fields]
  242. else:
  243. self._predict_input_record = None
  244. self.request_only = True
  245. if len(input_record.all_scalars()) == 0:
  246. self.request_only = False
  247. for scalar in input_record.all_scalars():
  248. if not is_request_only_scalar(scalar):
  249. self.request_only = False
  250. break
  251. self.precomputation_request_only = False
  252. self.precomputation_object_only = False
  253. self._output_schema = None
  254. self._predict_output_schema = None
  255. self.eval_output_schema = None
  256. self.tags = set(tags or [])
  257. self.tags.update(TagContext.current().tags)
  258. self.params = []
  259. self._export_output_for_metrics = False
  260. self._export_params_for_metrics = False
  261. def get_type(self):
  262. return self.__class__.__name__
  263. def _check_output_schema(self):
  264. assert self._output_schema is not None, "Schema is not initialized"
  265. assert self._predict_output_schema is None or schema.is_schema_subset(
  266. self._predict_output_schema, self._output_schema
  267. ), "predict_output_schema is not a subset of the output_schema"
  268. @property
  269. def predict_input_record(self):
  270. return self._predict_input_record or self._input_record
  271. @property
  272. def input_record(self):
  273. return self._input_record
  274. @property
  275. def predict_output_schema(self):
  276. self._check_output_schema()
  277. return self._predict_output_schema or self._output_schema
  278. @predict_output_schema.setter
  279. def predict_output_schema(self, output_schema):
  280. assert self._predict_output_schema is None
  281. self._predict_output_schema = output_schema
  282. @property
  283. def output_schema(self):
  284. if self.request_only:
  285. set_request_only(self._output_schema)
  286. self._check_output_schema()
  287. return self._output_schema
  288. @output_schema.setter
  289. def output_schema(self, output_schema):
  290. assert self._output_schema is None
  291. self._output_schema = output_schema
  292. def get_parameters(self):
  293. return self.params
  294. def get_fp16_compatible_parameters(self):
  295. """Return a subset of parameters which can be converted to fp16"""
  296. return []
  297. def get_memory_usage(self):
  298. return 0
  299. def get_accessed_features(self):
  300. """
  301. Return a map from field to list of AccessedFeatures, the map should
  302. contain all features accessed in the model layer
  303. """
  304. return {}
  305. def add_init_params(self, init_net):
  306. """
  307. Adds layer initialization operators to passed net.
  308. """
  309. for param in self.params:
  310. # TODO(amalevich): Either return back to lambdas, that add
  311. # all params (looks a bit safer and breaking less
  312. # abstractions) or extend Net interface to this type of
  313. # operations better
  314. # TODO(xlwang) init_net._net.op has type google.protobuf.\
  315. # internal.containers.RepeatedCompositeFieldContainer, but
  316. # the version of protobuf in fbcode does not support append
  317. # so extend is used
  318. init_op = param.initializer
  319. current_device_scope = scope.CurrentDeviceScope()
  320. if not init_op:
  321. continue
  322. if not init_op.HasField("device_option") and current_device_scope:
  323. init_op = caffe2_pb2.OperatorDef()
  324. init_op.CopyFrom(param.initializer)
  325. init_op.device_option.CopyFrom(current_device_scope)
  326. # do not add duplicated init ops
  327. if any(
  328. utils.OpAlmostEqual(op, init_op, "debug_info")
  329. for op in init_net._net.op
  330. ):
  331. continue
  332. init_net._net.op.extend([init_op])
  333. def create_param(
  334. self, param_name, shape, initializer, optimizer, ps_param=None, regularizer=None
  335. ):
  336. with scope.NameScope(self.name, reset=True):
  337. param = self.model.create_param(
  338. param_name=param_name,
  339. shape=shape,
  340. initializer=initializer,
  341. optimizer=optimizer,
  342. ps_param=ps_param,
  343. regularizer=regularizer,
  344. )
  345. # make sure we don't share parameters in the same layer
  346. assert all(param.parameter != p.parameter for p in self.params)
  347. self.params.append(param)
  348. return param.parameter
  349. def get_next_blob_reference(self, name):
  350. with scope.NameScope(self.name, reset=True):
  351. return self.model.net.NextScopedBlob(name)
  352. def add_operators(self, net, init_net=None, context=InstantiationContext.TRAINING):
  353. """
  354. Adds layer trainig or initialization operators to the passed in net.
  355. init_net can be None and can be called independently from add_init_params
  356. """
  357. # Namescope below should warranty that all intermediate blobs will be
  358. # assiciated with the layer that produces them
  359. with scope.NameScope(self.name):
  360. if context not in {
  361. InstantiationContext.PREDICTION,
  362. InstantiationContext.EVAL,
  363. InstantiationContext.ACCUMULATE_PRED,
  364. }:
  365. assert init_net, "Only prediction and eval context don't need init_net"
  366. if init_net:
  367. self.add_init_params(init_net)
  368. if context == InstantiationContext.TRAINING:
  369. self.add_train_ops(net)
  370. elif context == InstantiationContext.EVAL:
  371. self.add_eval_ops(net)
  372. elif context == InstantiationContext.ACCUMULATE_PRED:
  373. self.add_ops_to_accumulate_pred(net)
  374. else:
  375. self.add_ops(net)
  376. if (
  377. context in {InstantiationContext.TRAINING, InstantiationContext.EVAL}
  378. and self._export_params_for_metrics
  379. ):
  380. self.add_param_copy_operators(net)
  381. def add_ops(self, net):
  382. # Predict layer implementation.
  383. raise NotImplementedError
  384. def add_eval_ops(self, net):
  385. # Default eval layer implementation is completely matching
  386. # predict layer implementation.
  387. self.add_ops(net)
  388. def add_train_ops(self, net):
  389. # Default train layer implementation is completely matching
  390. # eval layer implementation.
  391. self.add_eval_ops(net)
  392. def add_ops_to_accumulate_pred(self, net):
  393. # This adds operators to accumulate predictions/labels/weights. The
  394. # accumulated data can later be used to compute calibration or for other
  395. # purpose. Default layer implementation is completely matching eval
  396. # layer implementation.
  397. self.add_eval_ops(net)
  398. def add_param_copy_operators(self, net):
  399. for param in self.params:
  400. param_copy_ref = self.model.metrics_schema[str(param.parameter)]
  401. net.Copy([param.parameter], param_copy_ref.field_blobs())
  402. def export_output_for_metrics(self):
  403. self._export_output_for_metrics = True
  404. # Export output of the layer directly
  405. export_name = self.name + "/output"
  406. self.model.add_metric_field(export_name, self.output_schema)
  407. def export_params_for_metrics(self):
  408. self._export_params_for_metrics = True
  409. # Export copies of parameters
  410. for param in self.params:
  411. param_copy_ref = self.get_next_blob_reference(
  412. str(param).split("/")[-1] + "_copy"
  413. )
  414. self.model.add_metric_field(str(param.parameter), param_copy_ref)