summaryrefslogtreecommitdiff
path: root/tools/net/ynl/lib/nlspec.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/net/ynl/lib/nlspec.py')
-rw-r--r--tools/net/ynl/lib/nlspec.py614
1 files changed, 0 insertions, 614 deletions
diff --git a/tools/net/ynl/lib/nlspec.py b/tools/net/ynl/lib/nlspec.py
deleted file mode 100644
index a745739655ad..000000000000
--- a/tools/net/ynl/lib/nlspec.py
+++ /dev/null
@@ -1,614 +0,0 @@
-# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
-
-import collections
-import importlib
-import os
-import yaml
-
-
-# To be loaded dynamically as needed
-jsonschema = None
-
-
-class SpecElement:
- """Netlink spec element.
-
- Abstract element of the Netlink spec. Implements the dictionary interface
- for access to the raw spec. Supports iterative resolution of dependencies
- across elements and class inheritance levels. The elements of the spec
- may refer to each other, and although loops should be very rare, having
- to maintain correct ordering of instantiation is painful, so the resolve()
- method should be used to perform parts of init which require access to
- other parts of the spec.
-
- Attributes:
- yaml raw spec as loaded from the spec file
- family back reference to the full family
-
- name name of the entity as listed in the spec (optional)
- ident_name name which can be safely used as identifier in code (optional)
- """
- def __init__(self, family, yaml):
- self.yaml = yaml
- self.family = family
-
- if 'name' in self.yaml:
- self.name = self.yaml['name']
- self.ident_name = self.name.replace('-', '_')
-
- self._super_resolved = False
- family.add_unresolved(self)
-
- def __getitem__(self, key):
- return self.yaml[key]
-
- def __contains__(self, key):
- return key in self.yaml
-
- def get(self, key, default=None):
- return self.yaml.get(key, default)
-
- def resolve_up(self, up):
- if not self._super_resolved:
- up.resolve()
- self._super_resolved = True
-
- def resolve(self):
- pass
-
-
-class SpecEnumEntry(SpecElement):
- """ Entry within an enum declared in the Netlink spec.
-
- Attributes:
- doc documentation string
- enum_set back reference to the enum
- value numerical value of this enum (use accessors in most situations!)
-
- Methods:
- raw_value raw value, i.e. the id in the enum, unlike user value which is a mask for flags
- user_value user value, same as raw value for enums, for flags it's the mask
- """
- def __init__(self, enum_set, yaml, prev, value_start):
- if isinstance(yaml, str):
- yaml = {'name': yaml}
- super().__init__(enum_set.family, yaml)
-
- self.doc = yaml.get('doc', '')
- self.enum_set = enum_set
-
- if 'value' in yaml:
- self.value = yaml['value']
- elif prev:
- self.value = prev.value + 1
- else:
- self.value = value_start
-
- def has_doc(self):
- return bool(self.doc)
-
- def raw_value(self):
- return self.value
-
- def user_value(self, as_flags=None):
- if self.enum_set['type'] == 'flags' or as_flags:
- return 1 << self.value
- else:
- return self.value
-
-
-class SpecEnumSet(SpecElement):
- """ Enum type
-
- Represents an enumeration (list of numerical constants)
- as declared in the "definitions" section of the spec.
-
- Attributes:
- type enum or flags
- entries entries by name
- entries_by_val entries by value
- Methods:
- get_mask for flags compute the mask of all defined values
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
-
- self.type = yaml['type']
-
- prev_entry = None
- value_start = self.yaml.get('value-start', 0)
- self.entries = dict()
- self.entries_by_val = dict()
- for entry in self.yaml['entries']:
- e = self.new_entry(entry, prev_entry, value_start)
- self.entries[e.name] = e
- self.entries_by_val[e.raw_value()] = e
- prev_entry = e
-
- def new_entry(self, entry, prev_entry, value_start):
- return SpecEnumEntry(self, entry, prev_entry, value_start)
-
- def has_doc(self):
- if 'doc' in self.yaml:
- return True
- return self.has_entry_doc()
-
- def has_entry_doc(self):
- for entry in self.entries.values():
- if entry.has_doc():
- return True
- return False
-
- def get_mask(self, as_flags=None):
- mask = 0
- for e in self.entries.values():
- mask += e.user_value(as_flags)
- return mask
-
-
-class SpecAttr(SpecElement):
- """ Single Netlink attribute type
-
- Represents a single attribute type within an attr space.
-
- Attributes:
- type string, attribute type
- value numerical ID when serialized
- attr_set Attribute Set containing this attr
- is_multi bool, attr may repeat multiple times
- struct_name string, name of struct definition
- sub_type string, name of sub type
- len integer, optional byte length of binary types
- display_hint string, hint to help choose format specifier
- when displaying the value
- sub_message string, name of sub message type
- selector string, name of attribute used to select
- sub-message type
-
- is_auto_scalar bool, attr is a variable-size scalar
- """
- def __init__(self, family, attr_set, yaml, value):
- super().__init__(family, yaml)
-
- self.type = yaml['type']
- self.value = value
- self.attr_set = attr_set
- self.is_multi = yaml.get('multi-attr', False)
- self.struct_name = yaml.get('struct')
- self.sub_type = yaml.get('sub-type')
- self.byte_order = yaml.get('byte-order')
- self.len = yaml.get('len')
- self.display_hint = yaml.get('display-hint')
- self.sub_message = yaml.get('sub-message')
- self.selector = yaml.get('selector')
-
- self.is_auto_scalar = self.type == "sint" or self.type == "uint"
-
-
-class SpecAttrSet(SpecElement):
- """ Netlink Attribute Set class.
-
- Represents a ID space of attributes within Netlink.
-
- Note that unlike other elements, which expose contents of the raw spec
- via the dictionary interface Attribute Set exposes attributes by name.
-
- Attributes:
- attrs ordered dict of all attributes (indexed by name)
- attrs_by_val ordered dict of all attributes (indexed by value)
- subset_of parent set if this is a subset, otherwise None
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
-
- self.subset_of = self.yaml.get('subset-of', None)
-
- self.attrs = collections.OrderedDict()
- self.attrs_by_val = collections.OrderedDict()
-
- if self.subset_of is None:
- val = 1
- for elem in self.yaml['attributes']:
- if 'value' in elem:
- val = elem['value']
-
- attr = self.new_attr(elem, val)
- self.attrs[attr.name] = attr
- self.attrs_by_val[attr.value] = attr
- val += 1
- else:
- real_set = family.attr_sets[self.subset_of]
- for elem in self.yaml['attributes']:
- attr = real_set[elem['name']]
- self.attrs[attr.name] = attr
- self.attrs_by_val[attr.value] = attr
-
- def new_attr(self, elem, value):
- return SpecAttr(self.family, self, elem, value)
-
- def __getitem__(self, key):
- return self.attrs[key]
-
- def __contains__(self, key):
- return key in self.attrs
-
- def __iter__(self):
- yield from self.attrs
-
- def items(self):
- return self.attrs.items()
-
-
-class SpecStructMember(SpecElement):
- """Struct member attribute
-
- Represents a single struct member attribute.
-
- Attributes:
- type string, type of the member attribute
- byte_order string or None for native byte order
- enum string, name of the enum definition
- len integer, optional byte length of binary types
- display_hint string, hint to help choose format specifier
- when displaying the value
- struct string, name of nested struct type
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
- self.type = yaml['type']
- self.byte_order = yaml.get('byte-order')
- self.enum = yaml.get('enum')
- self.len = yaml.get('len')
- self.display_hint = yaml.get('display-hint')
- self.struct = yaml.get('struct')
-
-
-class SpecStruct(SpecElement):
- """Netlink struct type
-
- Represents a C struct definition.
-
- Attributes:
- members ordered list of struct members
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
-
- self.members = []
- for member in yaml.get('members', []):
- self.members.append(self.new_member(family, member))
-
- def new_member(self, family, elem):
- return SpecStructMember(family, elem)
-
- def __iter__(self):
- yield from self.members
-
- def items(self):
- return self.members.items()
-
-
-class SpecSubMessage(SpecElement):
- """ Netlink sub-message definition
-
- Represents a set of sub-message formats for polymorphic nlattrs
- that contain type-specific sub messages.
-
- Attributes:
- name string, name of sub-message definition
- formats dict of sub-message formats indexed by match value
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
-
- self.formats = collections.OrderedDict()
- for elem in self.yaml['formats']:
- format = self.new_format(family, elem)
- self.formats[format.value] = format
-
- def new_format(self, family, format):
- return SpecSubMessageFormat(family, format)
-
-
-class SpecSubMessageFormat(SpecElement):
- """ Netlink sub-message format definition
-
- Represents a single format for a sub-message.
-
- Attributes:
- value attribute value to match against type selector
- fixed_header string, name of fixed header, or None
- attr_set string, name of attribute set, or None
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
-
- self.value = yaml.get('value')
- self.fixed_header = yaml.get('fixed-header')
- self.attr_set = yaml.get('attribute-set')
-
-
-class SpecOperation(SpecElement):
- """Netlink Operation
-
- Information about a single Netlink operation.
-
- Attributes:
- value numerical ID when serialized, None if req/rsp values differ
-
- req_value numerical ID when serialized, user -> kernel
- rsp_value numerical ID when serialized, user <- kernel
- modes supported operation modes (do, dump, event etc.)
- is_call bool, whether the operation is a call
- is_async bool, whether the operation is a notification
- is_resv bool, whether the operation does not exist (it's just a reserved ID)
- attr_set attribute set name
- fixed_header string, optional name of fixed header struct
-
- yaml raw spec as loaded from the spec file
- """
- def __init__(self, family, yaml, req_value, rsp_value):
- super().__init__(family, yaml)
-
- self.value = req_value if req_value == rsp_value else None
- self.req_value = req_value
- self.rsp_value = rsp_value
-
- self.modes = yaml.keys() & {'do', 'dump', 'event', 'notify'}
- self.is_call = 'do' in yaml or 'dump' in yaml
- self.is_async = 'notify' in yaml or 'event' in yaml
- self.is_resv = not self.is_async and not self.is_call
- self.fixed_header = self.yaml.get('fixed-header', family.fixed_header)
-
- # Added by resolve:
- self.attr_set = None
- delattr(self, "attr_set")
-
- def resolve(self):
- self.resolve_up(super())
-
- if 'attribute-set' in self.yaml:
- attr_set_name = self.yaml['attribute-set']
- elif 'notify' in self.yaml:
- msg = self.family.msgs[self.yaml['notify']]
- attr_set_name = msg['attribute-set']
- elif self.is_resv:
- attr_set_name = ''
- else:
- raise Exception(f"Can't resolve attribute set for op '{self.name}'")
- if attr_set_name:
- self.attr_set = self.family.attr_sets[attr_set_name]
-
-
-class SpecMcastGroup(SpecElement):
- """Netlink Multicast Group
-
- Information about a multicast group.
-
- Value is only used for classic netlink families that use the
- netlink-raw schema. Genetlink families use dynamic ID allocation
- where the ids of multicast groups get resolved at runtime. Value
- will be None for genetlink families.
-
- Attributes:
- name name of the mulitcast group
- value integer id of this multicast group for netlink-raw or None
- yaml raw spec as loaded from the spec file
- """
- def __init__(self, family, yaml):
- super().__init__(family, yaml)
- self.value = self.yaml.get('value')
-
-
-class SpecFamily(SpecElement):
- """ Netlink Family Spec class.
-
- Netlink family information loaded from a spec (e.g. in YAML).
- Takes care of unfolding implicit information which can be skipped
- in the spec itself for brevity.
-
- The class can be used like a dictionary to access the raw spec
- elements but that's usually a bad idea.
-
- Attributes:
- proto protocol type (e.g. genetlink)
- msg_id_model enum-model for operations (unified, directional etc.)
- license spec license (loaded from an SPDX tag on the spec)
-
- attr_sets dict of attribute sets
- msgs dict of all messages (index by name)
- sub_msgs dict of all sub messages (index by name)
- ops dict of all valid requests / responses
- ntfs dict of all async events
- consts dict of all constants/enums
- fixed_header string, optional name of family default fixed header struct
- mcast_groups dict of all multicast groups (index by name)
- kernel_family dict of kernel family attributes
- """
- def __init__(self, spec_path, schema_path=None, exclude_ops=None):
- with open(spec_path, "r") as stream:
- prefix = '# SPDX-License-Identifier: '
- first = stream.readline().strip()
- if not first.startswith(prefix):
- raise Exception('SPDX license tag required in the spec')
- self.license = first[len(prefix):]
-
- stream.seek(0)
- spec = yaml.safe_load(stream)
-
- self._resolution_list = []
-
- super().__init__(self, spec)
-
- self._exclude_ops = exclude_ops if exclude_ops else []
-
- self.proto = self.yaml.get('protocol', 'genetlink')
- self.msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
-
- if schema_path is None:
- schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
- if schema_path:
- global jsonschema
-
- with open(schema_path, "r") as stream:
- schema = yaml.safe_load(stream)
-
- if jsonschema is None:
- jsonschema = importlib.import_module("jsonschema")
-
- jsonschema.validate(self.yaml, schema)
-
- self.attr_sets = collections.OrderedDict()
- self.sub_msgs = collections.OrderedDict()
- self.msgs = collections.OrderedDict()
- self.req_by_value = collections.OrderedDict()
- self.rsp_by_value = collections.OrderedDict()
- self.ops = collections.OrderedDict()
- self.ntfs = collections.OrderedDict()
- self.consts = collections.OrderedDict()
- self.mcast_groups = collections.OrderedDict()
- self.kernel_family = collections.OrderedDict(self.yaml.get('kernel-family', {}))
-
- last_exception = None
- while len(self._resolution_list) > 0:
- resolved = []
- unresolved = self._resolution_list
- self._resolution_list = []
-
- for elem in unresolved:
- try:
- elem.resolve()
- except (KeyError, AttributeError) as e:
- self._resolution_list.append(elem)
- last_exception = e
- continue
-
- resolved.append(elem)
-
- if len(resolved) == 0:
- raise last_exception
-
- def new_enum(self, elem):
- return SpecEnumSet(self, elem)
-
- def new_attr_set(self, elem):
- return SpecAttrSet(self, elem)
-
- def new_struct(self, elem):
- return SpecStruct(self, elem)
-
- def new_sub_message(self, elem):
- return SpecSubMessage(self, elem);
-
- def new_operation(self, elem, req_val, rsp_val):
- return SpecOperation(self, elem, req_val, rsp_val)
-
- def new_mcast_group(self, elem):
- return SpecMcastGroup(self, elem)
-
- def add_unresolved(self, elem):
- self._resolution_list.append(elem)
-
- def _dictify_ops_unified(self):
- self.fixed_header = self.yaml['operations'].get('fixed-header')
- val = 1
- for elem in self.yaml['operations']['list']:
- if 'value' in elem:
- val = elem['value']
-
- op = self.new_operation(elem, val, val)
- val += 1
-
- self.msgs[op.name] = op
-
- def _dictify_ops_directional(self):
- self.fixed_header = self.yaml['operations'].get('fixed-header')
- req_val = rsp_val = 1
- for elem in self.yaml['operations']['list']:
- if 'notify' in elem or 'event' in elem:
- if 'value' in elem:
- rsp_val = elem['value']
- req_val_next = req_val
- rsp_val_next = rsp_val + 1
- req_val = None
- elif 'do' in elem or 'dump' in elem:
- mode = elem['do'] if 'do' in elem else elem['dump']
-
- v = mode.get('request', {}).get('value', None)
- if v:
- req_val = v
- v = mode.get('reply', {}).get('value', None)
- if v:
- rsp_val = v
-
- rsp_inc = 1 if 'reply' in mode else 0
- req_val_next = req_val + 1
- rsp_val_next = rsp_val + rsp_inc
- else:
- raise Exception("Can't parse directional ops")
-
- if req_val == req_val_next:
- req_val = None
- if rsp_val == rsp_val_next:
- rsp_val = None
-
- skip = False
- for exclude in self._exclude_ops:
- skip |= bool(exclude.match(elem['name']))
- if not skip:
- op = self.new_operation(elem, req_val, rsp_val)
-
- req_val = req_val_next
- rsp_val = rsp_val_next
-
- self.msgs[op.name] = op
-
- def find_operation(self, name):
- """
- For a given operation name, find and return operation spec.
- """
- for op in self.yaml['operations']['list']:
- if name == op['name']:
- return op
- return None
-
- def resolve(self):
- self.resolve_up(super())
-
- definitions = self.yaml.get('definitions', [])
- for elem in definitions:
- if elem['type'] == 'enum' or elem['type'] == 'flags':
- self.consts[elem['name']] = self.new_enum(elem)
- elif elem['type'] == 'struct':
- self.consts[elem['name']] = self.new_struct(elem)
- else:
- self.consts[elem['name']] = elem
-
- for elem in self.yaml['attribute-sets']:
- attr_set = self.new_attr_set(elem)
- self.attr_sets[elem['name']] = attr_set
-
- for elem in self.yaml.get('sub-messages', []):
- sub_message = self.new_sub_message(elem)
- self.sub_msgs[sub_message.name] = sub_message
-
- if self.msg_id_model == 'unified':
- self._dictify_ops_unified()
- elif self.msg_id_model == 'directional':
- self._dictify_ops_directional()
-
- for op in self.msgs.values():
- if op.req_value is not None:
- self.req_by_value[op.req_value] = op
- if op.rsp_value is not None:
- self.rsp_by_value[op.rsp_value] = op
- if not op.is_async and 'attribute-set' in op:
- self.ops[op.name] = op
- elif op.is_async:
- self.ntfs[op.name] = op
-
- mcgs = self.yaml.get('mcast-groups')
- if mcgs:
- for elem in mcgs['list']:
- mcg = self.new_mcast_group(elem)
- self.mcast_groups[elem['name']] = mcg