probe: port `gooftool/probe.py` to probe generic functions. [chromiumos/platform/factory : master]

57 views
Skip to first unread message

ChromeOS bot (Gerrit)

unread,
May 10, 2017, 5:08:47 AM5/10/17
to Chih-Yu Huang, ChromeOS Commit Bot, Yong Hong, Hung-Te Lin, Wei-Han Chen

ChromeOS bot merged this change.

View Change

probe: port `gooftool/probe.py` to probe generic functions.

In order to replace gooftool by the new probe framework, we need to
provide the generic probe function for each device. In this CL, we
ported the probe function for each component from `gooftool/probe.py`.

BUG=chromium:689944
TEST=none

Change-Id: Ia0f28b578296c37fc3c65752d38f43024e01eac1
Reviewed-on: https://chromium-review.googlesource.com/455599
Commit-Ready: Chih-Yu Huang <akah...@chromium.org>
Tested-by: Chih-Yu Huang <akah...@chromium.org>
Reviewed-by: Yong Hong <yh...@chromium.org>
---
A py/probe/functions/chromeos_firmware.py
A py/probe/functions/generic_audio_codec.py
A py/probe/functions/generic_battery.py
A py/probe/functions/generic_bluetooth.py
A py/probe/functions/generic_cpu.py
A py/probe/functions/generic_dram.py
A py/probe/functions/generic_network_device.py
A py/probe/functions/generic_storage.py
A py/probe/functions/generic_tpm.py
A py/probe/functions/generic_usb_hosts.py
A py/probe/functions/generic_video.py
A py/probe/functions/glob_path.py
M py/probe/functions/vpd.py
M py/probe/probe_cmdline.py
14 files changed, 995 insertions(+), 3 deletions(-)

diff --git a/py/probe/functions/chromeos_firmware.py b/py/probe/functions/chromeos_firmware.py
new file mode 100644
index 0000000..cee4b4b
--- /dev/null
+++ b/py/probe/functions/chromeos_firmware.py
@@ -0,0 +1,121 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import re
+import hashlib
+import tempfile
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils.arg_utils import Arg
+from cros.factory.utils import process_utils
+from cros.factory.utils import type_utils
+from cros.factory.gooftool import crosfw
+
+FIELDS = type_utils.Enum(
+ ['firmware_keys', 'ro_main_firmware', 'ro_ec_firmware', 'ro_pd_firmware'])
+
+
+def _FwKeyHash(fw_file_path, key_name):
+ """Hash specified GBB key, extracted by vbutil_key."""
+ known_hashes = {
+ 'b11d74edd286c144e1135b49e7f0bc20cf041f10': 'devkeys/rootkey',
+ 'c14bd720b70d97394257e3e826bd8f43de48d4ed': 'devkeys/recovery',
+ }
+ with tempfile.NamedTemporaryFile(prefix='gbb_%s_' % key_name) as f:
+ process_utils.CheckOutput(
+ 'gbb_utility -g --%s=%s %s' % (key_name, f.name, fw_file_path),
+ shell=True, log=True)
+ key_info = process_utils.CheckOutput(
+ 'vbutil_key --unpack %s' % f.name, shell=True)
+ sha1sum = re.findall(r'Key sha1sum:[\s]+([\w]+)', key_info)
+ if len(sha1sum) != 1:
+ logging.error('Failed calling vbutil_key for firmware key hash.')
+ return None
+ sha1 = sha1sum[0]
+ if sha1 in known_hashes:
+ sha1 += '#' + known_hashes[sha1]
+ return 'kv3#' + sha1
+
+
+def _AddFirmwareIdTag(image, id_name='RO_FRID'):
+ """Returns firmware ID in '#NAME' format if available."""
+ if not image.has_section(id_name):
+ return ''
+ id_stripped = image.get_section(id_name).strip(chr(0))
+ if id_stripped:
+ return '#%s' % id_stripped
+ return ''
+
+
+def _MainRoHash(image):
+ """Algorithm: sha256(fmap, RO_SECTION[-GBB])."""
+ hash_src = image.get_fmap_blob()
+ gbb = image.get_section('GBB')
+ zero_gbb = chr(0) * len(gbb)
+ image.put_section('GBB', zero_gbb)
+ hash_src += image.get_section('RO_SECTION')
+ image.put_section('GBB', gbb)
+ # pylint: disable=E1101
+ return {
+ 'hash': hashlib.sha256(hash_src).hexdigest(),
+ 'version': _AddFirmwareIdTag(image).lstrip('#')}
+
+
+def _EcRoHash(image):
+ """Algorithm: sha256(fmap, EC_RO)."""
+ hash_src = image.get_fmap_blob()
+ hash_src += image.get_section('EC_RO')
+ # pylint: disable=E1101
+ return {
+ 'hash': hashlib.sha256(hash_src).hexdigest(),
+ 'version': _AddFirmwareIdTag(image).lstrip('#')}
+
+
+def CalculateFirmwareHashes(fw_file_path):
+ """Calculate the volatile hashes corresponding to a firmware blob.
+
+ Given a firmware blob, determine what kind of firmware it is based
+ on what sections are present. Then generate a dict containing the
+ corresponding hash values.
+ """
+ raw_image = open(fw_file_path, 'rb').read()
+ try:
+ image = crosfw.FirmwareImage(raw_image)
+ except: # pylint: disable=W0702
+ return None
+
+ if image.has_section('EC_RO'):
+ return _EcRoHash(image)
+ elif image.has_section('GBB') and image.has_section('RO_SECTION'):
+ return _MainRoHash(image)
+
+
+class ChromeosFirmwareFunction(function.ProbeFunction):
+ """Get information of flash chip."""
+
+ ARGS = [
+ Arg('field', str,
+ 'The flash chip. It should be one of {%s}' %
+ ', '.join(FIELDS)),
+ ]
+
+ def Probe(self):
+ if self.args.field not in FIELDS:
+ return function.NOTHING
+
+ if self.args.field == FIELDS.firmware_keys:
+ fw_file_path = crosfw.LoadMainFirmware().GetFileName()
+ return {
+ 'key_recovery': _FwKeyHash(fw_file_path, 'recoverykey'),
+ 'key_root': _FwKeyHash(fw_file_path, 'rootkey')}
+
+ if self.args.field == FIELDS.ro_main_firmware:
+ fw_file_path = crosfw.LoadMainFirmware().GetFileName()
+ if self.args.field == FIELDS.ro_ec_firmware:
+ fw_file_path = crosfw.LoadEcFirmware().GetFileName()
+ if self.args.field == FIELDS.ro_pd_firmware:
+ fw_file_path = crosfw.LoadPDFirmware().GetFileName()
+ return CalculateFirmwareHashes(fw_file_path)
diff --git a/py/probe/functions/generic_audio_codec.py b/py/probe/functions/generic_audio_codec.py
new file mode 100644
index 0000000..2db43e7
--- /dev/null
+++ b/py/probe/functions/generic_audio_codec.py
@@ -0,0 +1,58 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+import re
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils import process_utils
+
+
+RESULT_KEY = 'name'
+
+
+class GenericAudioCodecFunction(function.ProbeFunction):
+ """Probe the generic audio codec information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ def Probe(self):
+ """Looks for codec strings.
+
+ Collect /sys/kernel/debug/asoc/codecs for ASOC (ALSA SOC) drivers,
+ /proc/asound for HDA codecs, then PCM details.
+
+ There is a set of known invalid codec names that are not included in the
+ return value.
+ """
+ KNOWN_INVALID_CODEC_NAMES = set([
+ 'snd-soc-dummy',
+ 'ts3a227e.4-003b', # autonomous audiojack switch, not an audio codec
+ 'dw-hdmi-audio' # this is a virtual audio codec driver
+ ])
+
+ results = []
+ asoc_path = '/sys/kernel/debug/asoc/codecs'
+ if os.path.exists(asoc_path):
+ with open(asoc_path) as f:
+ results = [codec.strip()
+ for codec in f.read().splitlines()
+ if codec not in KNOWN_INVALID_CODEC_NAMES]
+
+ grep_result = process_utils.SpawnOutput(
+ 'grep -R "Codec:" /proc/asound/*', shell=True, log=True)
+ match_set = set()
+ for line in grep_result.splitlines():
+ match_set |= set(re.findall(r'.*Codec:(.*)', line))
+ results += [match.strip() for match in sorted(match_set) if match]
+
+ if not results:
+ # Formatted '00-00: WM??? PCM wm???-hifi-0: ...'
+ with open('/proc/asound/pcm', 'r') as f:
+ pcm_data = f.read().strip().split(' ')
+ if len(pcm_data) > 2:
+ results.append(pcm_data[1])
+ return [{RESULT_KEY: result} for result in results]
diff --git a/py/probe/functions/generic_battery.py b/py/probe/functions/generic_battery.py
new file mode 100644
index 0000000..6bbc3fa
--- /dev/null
+++ b/py/probe/functions/generic_battery.py
@@ -0,0 +1,30 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe.functions import sysfs
+
+
+class GenericBatteryFunction(sysfs.SysfsFunction):
+ """Probe the generic battery information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ ARGS = []
+
+ def __init__(self, **kwargs):
+ super(GenericBatteryFunction, self).__init__(**kwargs)
+
+ self.args.dir_path = '/sys/class/power_supply/*'
+ self.args.keys = ['manufacturer', 'model_name', 'technology', 'type']
+ self.args.optional_keys = ['charge_full_design', 'energy_full_design']
+
+ def Probe(self):
+ def ValidBatteryResult(result):
+ return result.pop('type') == 'Battery' and (
+ 'charge_full_design' in result or 'energy_full_design' in result)
+
+ return [result for result in super(GenericBatteryFunction, self).Probe()
+ if ValidBatteryResult(result)]
diff --git a/py/probe/functions/generic_bluetooth.py b/py/probe/functions/generic_bluetooth.py
new file mode 100644
index 0000000..5ca9d04
--- /dev/null
+++ b/py/probe/functions/generic_bluetooth.py
@@ -0,0 +1,89 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+
+
+def _ProbePCIOrUSB(path):
+ return (function.InterpretFunction({'pci': path})() or
+ function.InterpretFunction({'usb': path})())
+
+
+def _RecursiveProbe(path, read_method):
+ """Recursively probes in path and all the subdirectory using read_method.
+
+ Args:
+ path: Root path of the recursive probing.
+ read_method: The method used to probe device information.
+ This method accepts an input path and returns a string.
+ e.g. _ReadSysfsUsbFields, _ReadSysfsPciFields, or _ReadSysfsDeviceId.
+
+ Returns:
+ A list of strings which contains probed results under path and
+ all the subdirectory of path. Duplicated data will be omitted.
+ """
+ def _InternalRecursiveProbe(path, visited_path, results, read_method):
+ """Recursively probes in path and all the subdirectory using read_method.
+
+ Args:
+ path: Root path of the recursive probing.
+ visited_path: A set containing visited paths. These paths will not
+ be visited again.
+ results: A list of string which contains probed results.
+ This list will be appended through the recursive probing.
+ read_method: The method used to probe device information.
+ This method accepts an input path and returns a string.
+
+ Returns:
+ No return value. results in the input will be appended with probed
+ information. Duplicated data will be omitted.
+ """
+ path = os.path.realpath(path)
+ if path in visited_path:
+ return
+
+ if os.path.isdir(path):
+ data = read_method(path)
+ # Only append new data
+ for result in data:
+ if result not in results:
+ results.append(result)
+ entries_list = os.listdir(path)
+ visited_path.add(path)
+ else:
+ return
+
+ for filename in entries_list:
+ # Do not search directory upward
+ if filename == 'subsystem':
+ continue
+ sub_path = os.path.join(path, filename)
+ _InternalRecursiveProbe(sub_path, visited_path, results, read_method)
+ return
+
+ visited_path = set()
+ results = []
+ _InternalRecursiveProbe(path, visited_path, results, read_method)
+ return results
+
+
+class GenericBluetoothFunction(function.ProbeFunction):
+ """Probe the generic Bluetooth information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ def Probe(self):
+ # Probe in primary path
+ device_id = _ProbePCIOrUSB('/sys/class/bluetooth/hci0/device')
+ if device_id:
+ return device_id
+ # TODO(akahuang): Confirm if we only probe the primary path or not.
+ # Use information in driver if probe failed in primary path
+ device_id_list = _RecursiveProbe('/sys/module/bluetooth/holders',
+ _ProbePCIOrUSB)
+ return sorted(x for x in device_id_list if x)
diff --git a/py/probe/functions/generic_cpu.py b/py/probe/functions/generic_cpu.py
new file mode 100644
index 0000000..4d4214f
--- /dev/null
+++ b/py/probe/functions/generic_cpu.py
@@ -0,0 +1,75 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import re
+import subprocess
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils import process_utils
+from cros.factory.utils.arg_utils import Arg
+from cros.factory.utils import type_utils
+
+
+KNOWN_CPU_TYPES = type_utils.Enum(['x86', 'arm'])
+
+
+class GenericCPUFunction(function.ProbeFunction):
+ """Probe the generic CPU information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ ARGS = [
+ Arg('cpu_type', str, 'The type of CPU. "x86" or "arm".', default=None),
+ ]
+
+ def Probe(self):
+ if self.args.cpu_type is None:
+ logging.info('cpu_type is not assigned. Determine by crossystem.')
+ self.args.cpu_type = process_utils.CheckOutput(
+ 'crossystem arch', shell=True)
+ if self.args.cpu_type not in KNOWN_CPU_TYPES:
+ logging.error('cpu_type should be one of %r.', list(KNOWN_CPU_TYPES))
+ return function.NOTHING
+
+ if self.args.cpu_type == KNOWN_CPU_TYPES.x86:
+ return self._ProbeX86()
+ if self.args.cpu_type == KNOWN_CPU_TYPES.arm:
+ return self._ProbeArm()
+
+ def _ProbeX86(self):
+ cmd = r'sed -nr "s/^model name\s*: (.*)/\1/p" /proc/cpuinfo'
+ try:
+ stdout = process_utils.CheckOutput(cmd, shell=True, log=True).splitlines()
+ except subprocess.CalledProcessError:
+ return function.NOTHING
+ return {
+ 'model': stdout[0].strip(),
+ 'cores': str(len(stdout))}
+
+ def _ProbeArm(self):
+ # For platforms like arm, it sometimes gives the model name in 'Processor',
+ # and sometimes in 'model name'. But they all give something like 'ARMv7
+ # Processor rev 4 (v71)' only. So to uniquely identify an ARM CPU, we should
+ # use the 'Hardware' field.
+ CPU_INFO_FILE = '/proc/cpuinfo'
+ with open(CPU_INFO_FILE, 'r') as f:
+ cpuinfo = f.read()
+
+ def _SearchCPUInfo(regex, name):
+ matched = re.search(regex, cpuinfo, re.MULTILINE)
+ if matched is None:
+ logging.error('Unable to find "%s" field in %s.', name, CPU_INFO_FILE)
+ return 'unknown'
+ return matched.group(1)
+
+ model = _SearchCPUInfo(r'^(?:Processor|model name)\s*: (.*)$', 'model')
+ hardware = _SearchCPUInfo(r'^Hardware\s*: (.*)$', 'hardware')
+ cores = process_utils.CheckOutput('nproc', shell=True, log=True)
+ return {
+ 'model': model.strip(),
+ 'cores': cores.strip(),
+ 'hardware': hardware.strip()}
diff --git a/py/probe/functions/generic_dram.py b/py/probe/functions/generic_dram.py
new file mode 100644
index 0000000..b014be3
--- /dev/null
+++ b/py/probe/functions/generic_dram.py
@@ -0,0 +1,43 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import re
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils import process_utils
+from cros.factory.utils import sys_utils
+
+
+class GenericDRAMFunction(function.ProbeFunction):
+ """Probe the generic DRAM information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ def Probe(self):
+ """Combine mosys memory timing and geometry information."""
+ # TODO(tammo): Document why mosys cannot load i2c_dev itself.
+ sys_utils.LoadKernelModule('i2c_dev', error_on_fail=False)
+ part_data = process_utils.CheckOutput(
+ 'mosys -k memory spd print id', shell=True, log=True)
+ timing_data = process_utils.CheckOutput(
+ 'mosys -k memory spd print timings', shell=True, log=True)
+ size_data = process_utils.CheckOutput(
+ 'mosys -k memory spd print geometry', shell=True, log=True)
+ parts = dict(re.findall('dimm="([^"]*)".*part_number="([^"]*)"', part_data))
+ timings = dict(re.findall('dimm="([^"]*)".*speeds="([^"]*)"', timing_data))
+ sizes = dict(re.findall('dimm="([^"]*)".*size_mb="([^"]*)"', size_data))
+
+ results = []
+ for slot in sorted(parts):
+ part = parts[slot]
+ size = sizes[slot]
+ timing = timings[slot].replace(' ', '')
+ results.append({
+ 'slot': slot,
+ 'part': part,
+ 'size': size,
+ 'timing': timing})
+ return results
diff --git a/py/probe/functions/generic_network_device.py b/py/probe/functions/generic_network_device.py
new file mode 100644
index 0000000..b407f22
--- /dev/null
+++ b/py/probe/functions/generic_network_device.py
@@ -0,0 +1,218 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import re
+import sys
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils.arg_utils import Arg
+from cros.factory.utils import process_utils
+from cros.factory.utils import type_utils
+from cros.factory.utils.type_utils import Obj
+
+try:
+ sys.path.append('/usr/local/lib/flimflam/test')
+ import flimflam # pylint: disable=F0401
+except: # pylint: disable=W0702
+ pass
+
+
+KNOWN_DEVICE_TYPES = type_utils.Enum(['wireless', 'ethernet', 'cellular'])
+
+
+class NetworkDevices(object):
+ """A general probing module for network devices."""
+
+ cached_dev_list = None
+
+ @classmethod
+ def _GetIwconfigDevices(cls, extension='IEEE 802.11'):
+ """Wrapper around iwconfig(8) information.
+
+ Example output:
+
+ eth0 no wireless extensions.
+
+ wlan0 IEEE 802.11abgn ESSID:off/any
+ Mod:Managed Access Point: Not-Associated Tx-Power=20 dBm
+ ...
+
+ Returns a list of network objects having WiFi extension.
+ """
+ return [Obj(devtype='wifi',
+ path='/sys/class/net/%s/device' % node.split()[0])
+ for node in process_utils.CheckOutput('iwconfig').splitlines()
+ if extension in node]
+
+ @classmethod
+ def _GetIwDevices(cls, iw_type='managed'):
+ """Wrapper around iw(8) information.
+
+ Command 'iw' explicitly said "Do NOT screenscrape this tool" but we have no
+ any better solutions. A typical output for 'iw dev' on mwifiex:
+
+ phy#0
+ Interface p2p0
+ ifindex 4
+ wdev 0x3
+ addr 28:c2:dd:45:94:39
+ type P2P-client
+ Interface uap0
+ ifindex 3
+ wdev 0x2
+ addr 28:c2:dd:45:94:39
+ type AP
+ Interface mlan0
+ ifindex 2
+ wdev 0x1
+ addr 28:c2:dd:45:94:39
+ type managed
+
+ p2p0 and uap0 are virtual nodes and what we really want is mlan0 (managed).
+
+ Returns:
+ A list of network objects with correct iw type.
+ """
+ data = [line.split()[1]
+ for line in process_utils.CheckOutput(
+ 'iw dev', shell=True, log=True).splitlines()
+ if ' ' in line and line.split()[0] in ['Interface', 'type']]
+ i = iter(data)
+ return [Obj(devtype='wifi', path='/sys/class/net/%s/device' % name)
+ for name in i if i.next() == iw_type]
+
+ @classmethod
+ def _GetFlimflamDevices(cls):
+ """Wrapper around flimflam (shill), the ChromeOS connection manager.
+
+ This object is a wrapper around the data from the flimflam module, providing
+ dbus format post processing.
+
+ Returns:
+ A list of network objects in Obj, having:
+ devtype: A string in flimflam Type (wifi, cellular, ethernet).
+ path: A string for /sys node device path.
+ attributes: A dictionary for additional attributes.
+ """
+ def _ProcessDevice(device):
+ properties = device.GetProperties()
+ get_prop = lambda p: flimflam.convert_dbus_value(properties[p])
+ result = Obj(
+ devtype=get_prop('Type'),
+ path='/sys/class/net/%s/device' % get_prop('Interface'))
+ if result.devtype == 'cellular':
+ result.attributes = dict(
+ (key, get_prop('Cellular.%s' % key))
+ for key in ['Carrier', 'FirmwareRevision', 'HardwareRevision',
+ 'ModelID', 'Manufacturer']
+ if 'Cellular.%s' % key in properties)
+ return result
+
+ return [_ProcessDevice(device) for device in
+ flimflam.FlimFlam().GetObjectList('Device')]
+
+ @classmethod
+ def GetDevices(cls, devtype=None):
+ """Returns network device information by given type.
+
+ Returned data is a list of Objs corresponding to detected devices.
+ Each has devtype (in same way as flimflam type classification) and path
+ (location of related data in sysfs) fields. For cellular devices, there is
+ also an attributes field which contains a dict of attribute:value items.
+ """
+ if cls.cached_dev_list is None:
+ dev_list = cls._GetFlimflamDevices()
+
+ # On some Brillo (AP-type) devices, WiFi interfaces are blacklisted by
+ # shill and needs to be discovered manually, so we have to try 'iw config'
+ # or 'iw dev' to get a more correct list.
+ # 'iwconfig' is easier to parse, but for some WiFi drivers, for example
+ # mwifiex, do not support wireless extensions and only provide the new
+ # CFG80211/NL80211. Also mwifiex will create two more virtual nodes 'uap0,
+ # p2p0' so we can't rely on globbing /sys/class/net/*/wireless. The only
+ # solution is to trust 'iw dev'.
+
+ existing_nodes = [dev.path for dev in dev_list]
+ dev_list += [dev for dev in cls._GetIwconfigDevices()
+ if dev.path not in existing_nodes]
+
+ existing_nodes = [dev.path for dev in dev_list]
+ dev_list += [dev for dev in cls._GetIwDevices()
+ if dev.path not in existing_nodes]
+
+ cls.cached_dev_list = dev_list
+
+ return [dev for dev in cls.cached_dev_list
+ if devtype is None or dev.devtype == devtype]
+
+ @classmethod
+ def ReadSysfsDeviceIds(cls, devtype, ignore_usb=False):
+ """Return _ReadSysfsDeviceId result for each device of specified type."""
+ def ProbeSysfsDevices(path, ignore_usb):
+ ret = function.InterpretFunction({'pci': path})()
+ if ret:
+ return ret
+ if not ignore_usb:
+ ret = function.InterpretFunction({'usb': path})()
+ return ret
+
+ ret = []
+ for dev in cls.GetDevices(devtype):
+ ret += ProbeSysfsDevices(dev.path, ignore_usb)
+ # Filter out 'None' results
+ return sorted(device for device in ret if device is not None)
+
+
+class GenericNetworkDeviceFunction(function.ProbeFunction):
+ """Probes the information of network devices.
+
+ This function gets information of all network devices,
+ and then filters the results by the given arguments.
+ """
+
+ ARGS = [
+ Arg('device_type', str, 'The type of network device. '
+ 'One of "wireless", "ethernet", "cellular".'),
+ ]
+
+ def Probe(self):
+ if self.args.device_type not in KNOWN_DEVICE_TYPES:
+ logging.error('device_type should be one of %s.', KNOWN_DEVICE_TYPES)
+ return function.NOTHING
+ function_table = {
+ 'wireless': self.ProbeWireless,
+ 'ethernet': self.ProbeEthernet,
+ 'cellular': self.ProbeCellular}
+ devices = function_table[self.args.device_type]()
+ return devices
+
+ def ProbeWireless(self):
+ return NetworkDevices.ReadSysfsDeviceIds('wifi')
+
+ def ProbeEthernet(self):
+ # Build-in ethernet devices should not be attached to USB. They are usually
+ # either PCI or SOC.
+ return NetworkDevices.ReadSysfsDeviceIds('ethernet', ignore_usb=True)
+
+ def ProbeCellular(self):
+ # It is found that some cellular components may have their interface listed
+ # in shill but not available from /sys (for example, shill
+ # Interface=no_netdev_23 but no /sys/class/net/no_netdev_23. Meanwhile,
+ # 'modem status' gives right Device info like
+ # 'Device: /sys/devices/ff500000.usb/usb1/1-1'. Unfortunately, information
+ # collected by shill, 'modem status', or the USB node under Device are not
+ # always synced.
+ data = (NetworkDevices.ReadSysfsDeviceIds('cellular') or
+ [dev.attributes for dev in NetworkDevices.GetDevices('cellular')])
+ if data:
+ modem_status = process_utils.CheckOutput(
+ 'modem status', shell=True, log=True)
+ for key in ['carrier', 'firmware_revision', 'Revision']:
+ matches = re.findall(
+ r'^\s*' + key + ': (.*)$', modem_status, re.M)
+ if matches:
+ data[0][key] = matches[0]
+ return data
diff --git a/py/probe/functions/generic_storage.py b/py/probe/functions/generic_storage.py
new file mode 100644
index 0000000..c6844bb
--- /dev/null
+++ b/py/probe/functions/generic_storage.py
@@ -0,0 +1,127 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import glob
+import logging
+import os
+import re
+import string # pylint: disable=W0402
+import struct
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.probe.functions import sysfs
+from cros.factory.utils import file_utils
+from cros.factory.utils import process_utils
+
+
+STORAGE_SYSFS_PATH = '/sys/class/block/*'
+
+
+def GetFixedDevices():
+ """Returns paths to all fixed storage devices on the system."""
+ ret = []
+ for node in sorted(glob.glob(STORAGE_SYSFS_PATH)):
+ path = os.path.join(node, 'removable')
+ if not os.path.exists(path) or file_utils.ReadFile(path).strip() != '0':
+ continue
+ if re.match(r'^loop|^dm-', os.path.basename(node)):
+ # Loopback or dm-verity device; skip
+ continue
+ ret.append(node)
+ return ret
+
+
+def GetEMMC5FirmwareVersion(node_path):
+ """Extracts eMMC 5.0 firmware version from EXT_CSD[254:261].
+
+ Args:
+ node_path: the node_path returned by GetFixedDevices(). For example,
+ '/sys/class/block/mmcblk0'.
+
+ Returns:
+ A string indicating the firmware version if firmware version is found.
+ Return None if firmware version doesn't present.
+ """
+ ext_csd = process_utils.GetLines(process_utils.SpawnOutput(
+ 'mmc extcsd read /dev/%s' % os.path.basename(node_path),
+ shell=True, log=True))
+ # The output for firmware version is encoded by hexdump of a ASCII
+ # string or hexdump of hexadecimal values, always in 8 characters.
+ # For example, version 'ABCDEFGH' is:
+ # [FIRMWARE_VERSION[261]]: 0x48
+ # [FIRMWARE_VERSION[260]]: 0x47
+ # [FIRMWARE_VERSION[259]]: 0x46
+ # [FIRMWARE_VERSION[258]]: 0x45
+ # [FIRMWARE_VERSION[257]]: 0x44
+ # [FIRMWARE_VERSION[256]]: 0x43
+ # [FIRMWARE_VERSION[255]]: 0x42
+ # [FIRMWARE_VERSION[254]]: 0x41
+ #
+ # Some vendors might use hexadecimal values for it.
+ # For example, version 3 is:
+ # [FIRMWARE_VERSION[261]]: 0x00
+ # [FIRMWARE_VERSION[260]]: 0x00
+ # [FIRMWARE_VERSION[259]]: 0x00
+ # [FIRMWARE_VERSION[258]]: 0x00
+ # [FIRMWARE_VERSION[257]]: 0x00
+ # [FIRMWARE_VERSION[256]]: 0x00
+ # [FIRMWARE_VERSION[255]]: 0x00
+ # [FIRMWARE_VERSION[254]]: 0x03
+ #
+ # To handle both cases, this function returns a 64-bit hexadecimal value
+ # and will try to decode it as a ASCII string or as a 64-bit little-endian
+ # integer. It returns '4142434445464748 (ABCDEFGH)' for the first example
+ # and returns '0300000000000000 (3)' for the second example.
+
+ pattern = re.compile(r'^\[FIRMWARE_VERSION\[(\d+)\]\]: (.*)$')
+ data = dict(m.groups() for m in map(pattern.match, ext_csd) if m)
+ if not data:
+ return None
+
+ raw_version = [int(data[str(i)], 0) for i in range(254, 262)]
+ version = ''.join(('%02x' % c for c in raw_version))
+
+ # Try to decode it as a ASCII string.
+ # Note vendor may choose SPACE (0x20) or NUL (0x00) to pad version string,
+ # so we want to strip both in the human readable part.
+ ascii = ''.join(map(chr, raw_version)).strip(' \0')
+ if len(ascii) > 0 and all(c in string.printable for c in ascii):
+ version += ' (%s)' % ascii
+ else:
+ # Try to decode it as a 64-bit little-endian integer.
+ version += ' (%s)' % struct.unpack_from('<q', version.decode('hex'))
+ return version
+
+
+class GenericStorageFunction(function.ProbeFunction):
+ """Probe the generic storage information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+ ATA_FIELDS = ['vendor', 'model']
+ EMMC_FIELDS = ['type', 'name', 'hwrev', 'oemid', 'manfid']
+ # Another field 'cid' is a combination of all other fields so we should not
+ # include it again.
+ EMMC_OPTIONAL_FIELDS = ['prv']
+
+ def Probe(self):
+ return [ident for ident in map(self._ProcessNode, GetFixedDevices())
+ if ident is not None]
+
+ def _ProcessNode(self, node_path):
+ logging.info('Processing the node: %s', node_path)
+ dev_path = os.path.join(node_path, 'device')
+ data = (sysfs.ReadSysfs(dev_path, self.ATA_FIELDS) or
+ sysfs.ReadSysfs(dev_path, self.EMMC_FIELDS,
+ self.EMMC_OPTIONAL_FIELDS))
+ if not data:
+ return None
+ emmc5_fw_ver = GetEMMC5FirmwareVersion(node_path)
+ if emmc5_fw_ver is not None:
+ data['emmc5_fw_ver'] = emmc5_fw_ver
+ size_path = os.path.join(os.path.dirname(dev_path), 'size')
+ data['sectors'] = (file_utils.ReadFile(size_path).strip()
+ if os.path.exists(size_path) else '')
+ return data
diff --git a/py/probe/functions/generic_tpm.py b/py/probe/functions/generic_tpm.py
new file mode 100644
index 0000000..030cfd5
--- /dev/null
+++ b/py/probe/functions/generic_tpm.py
@@ -0,0 +1,26 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils import process_utils
+
+
+class GenericTPMFunction(function.ProbeFunction):
+ """Probe the generic TPM information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ def Probe(self):
+ tpm_data = [line.partition(':') for line in
+ process_utils.CheckOutput('tpm_version').splitlines()]
+ tpm_dict = dict((key.strip(), value.strip()) for
+ key, _, value in tpm_data)
+ mfg = tpm_dict.get('Manufacturer Info', None)
+ version = tpm_dict.get('Chip Version', None)
+ if mfg is not None and version is not None:
+ return {'manufacturer_info': mfg,
+ 'version': version}
+ return None
diff --git a/py/probe/functions/generic_usb_hosts.py b/py/probe/functions/generic_usb_hosts.py
new file mode 100644
index 0000000..f2682fb
--- /dev/null
+++ b/py/probe/functions/generic_usb_hosts.py
@@ -0,0 +1,40 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import glob
+import logging
+import os
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils import process_utils
+
+
+USB_SYSFS_PATH = '/sys/bus/usb/devices/usb*'
+
+
+class GenericUSBHostFunction(function.ProbeFunction):
+ """Probe the generic USB host information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ def Probe(self):
+ # On x86, USB hosts are PCI devices, located in parent of root USB.
+ # On ARM and others, use the root device itself.
+ arch = process_utils.CheckOutput('crossystem arch', shell=True)
+ relpath = '.' if arch == 'arm' else '..'
+ usb_bus_list = glob.glob(USB_SYSFS_PATH)
+ usb_host_list = [
+ os.path.abspath(os.path.join(os.path.realpath(path), relpath))
+ for path in usb_bus_list]
+ logging.debug('Paths of usb hosts: %s', usb_host_list)
+
+ ret = []
+ for path in usb_host_list:
+ results = (function.InterpretFunction({'pci': path})() or
+ function.InterpretFunction({'usb': path})())
+ if results:
+ ret += results
+ return ret
diff --git a/py/probe/functions/generic_video.py b/py/probe/functions/generic_video.py
new file mode 100644
index 0000000..a2d7ee5
--- /dev/null
+++ b/py/probe/functions/generic_video.py
@@ -0,0 +1,130 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import glob
+import re
+import os
+import fcntl
+import array
+import struct
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils import file_utils
+
+
+def _GetV4L2Data(video_idx):
+ # Get information from video4linux2 (v4l2) interface.
+ # See /usr/include/linux/videodev2.h for definition of these consts.
+ # 'ident' values are defined in include/media/v4l2-chip-ident.h
+ info = {}
+ VIDIOC_DBG_G_CHIP_IDENT = 0xc02c5651
+ V4L2_DBG_CHIP_IDENT_SIZE = 11
+ V4L2_INDEX_REVISION = V4L2_DBG_CHIP_IDENT_SIZE - 1
+ V4L2_INDEX_IDENT = V4L2_INDEX_REVISION - 1
+ V4L2_VALID_IDENT = 3 # V4L2_IDENT_UNKNOWN + 1
+
+ # Get v4l2 capability
+ V4L2_CAPABILITY_FORMAT = '<16B32B32BII4I'
+ V4L2_CAPABILITY_STRUCT_SIZE = struct.calcsize(V4L2_CAPABILITY_FORMAT)
+ V4L2_CAPABILITIES_OFFSET = struct.calcsize(V4L2_CAPABILITY_FORMAT[0:-3])
+ # struct v4l2_capability
+ # {
+ # __u8 driver[16];
+ # __u8 card[32];
+ # __u8 bus_info[32];
+ # __u32 version;
+ # __u32 capabilities; /* V4L2_CAPABILITIES_OFFSET */
+ # __u32 reserved[4];
+ # };
+
+ IOCTL_VIDIOC_QUERYCAP = 0x80685600
+
+ # Webcam should have CAPTURE capability but no OUTPUT capability.
+ V4L2_CAP_VIDEO_CAPTURE = 0x00000001
+ V4L2_CAP_VIDEO_OUTPUT = 0x00000002
+
+ # V4L2 encode/decode device should have the following capabilities.
+ V4L2_CAP_VIDEO_CAPTURE_MPLANE = 0x00001000
+ V4L2_CAP_VIDEO_OUTPUT_MPLANE = 0x00002000
+ V4L2_CAP_STREAMING = 0x04000000
+ V4L2_CAP_VIDEO_CODEC = (V4L2_CAP_VIDEO_CAPTURE_MPLANE |
+ V4L2_CAP_VIDEO_OUTPUT_MPLANE |
+ V4L2_CAP_STREAMING)
+
+ def _TryIoctl(fileno, request, *args):
+ """Try to invoke ioctl without raising an exception if it fails."""
+ try:
+ fcntl.ioctl(fileno, request, *args)
+ except: # pylint: disable=W0702
+ pass
+
+ try:
+ with open('/dev/video%d' % video_idx, 'r+') as f:
+ # Read chip identifier.
+ buf = array.array('i', [0] * V4L2_DBG_CHIP_IDENT_SIZE)
+ _TryIoctl(f.fileno(), VIDIOC_DBG_G_CHIP_IDENT, buf, 1)
+ v4l2_ident = buf[V4L2_INDEX_IDENT]
+ if v4l2_ident >= V4L2_VALID_IDENT:
+ info['ident'] = 'V4L2:%04x %04x' % (v4l2_ident,
+ buf[V4L2_INDEX_REVISION])
+ # Read V4L2 capabilities.
+ buf = array.array('B', [0] * V4L2_CAPABILITY_STRUCT_SIZE)
+ _TryIoctl(f.fileno(), IOCTL_VIDIOC_QUERYCAP, buf, 1)
+ capabilities = struct.unpack_from('<I', buf, V4L2_CAPABILITIES_OFFSET)[0]
+ if ((capabilities & V4L2_CAP_VIDEO_CAPTURE) and
+ (not capabilities & V4L2_CAP_VIDEO_OUTPUT)):
+ info['type'] = 'webcam'
+ elif capabilities & V4L2_CAP_VIDEO_CODEC == V4L2_CAP_VIDEO_CODEC:
+ info['type'] = 'video_codec'
+ except: # pylint: disable=W0702
+ pass
+ return info
+
+
+class GenericVideoFunction(function.ProbeFunction):
+ """Probe the generic video information.
+
+ The function is ported from `py/gooftool/probe.py` module.
+ """
+
+ def Probe(self):
+ ret = []
+ for video_node in glob.glob('/sys/class/video4linux/video*'):
+ logging.debug('Find the node: %s', video_node)
+ result = {}
+
+ path = os.path.join(video_node, 'device')
+ results = (function.InterpretFunction({'pci': path})() or
+ function.InterpretFunction({'usb': path})())
+ assert len(results) <= 1
+ if len(results) == 1:
+ result.update(results[0])
+ else:
+ name_path = os.path.join(video_node, 'name')
+ if os.path.exists(name_path):
+ device_id = file_utils.ReadFile(name_path)
+ if device_id:
+ result.update(
+ {'name': ' '.join(device_id.replace(chr(0), ' ').split())})
+
+ # TODO(akahuang): Check if these fields are needed.
+ # Also check video max packet size
+ path = os.path.join(video_node, 'device', 'ep_82', 'wMaxPacketSize')
+ if os.path.isfile(path):
+ result['wMaxPacketSize'] = file_utils.ReadFile(path).strip()
+ # For SOC videos
+ path = os.path.join(video_node, 'device', 'control', 'name')
+ if os.path.isfile(path):
+ result['name'] = file_utils.ReadFile(path).strip()
+ # Get video4linux2 (v4l2) result.
+ video_idx = re.search(r'video(\d+)$', video_node).group(1)
+ v4l2_data = _GetV4L2Data(int(video_idx))
+ if v4l2_data:
+ result.update(v4l2_data)
+
+ if result:
+ ret.append(result)
+ return ret
diff --git a/py/probe/functions/glob_path.py b/py/probe/functions/glob_path.py
new file mode 100644
index 0000000..2b11a3f
--- /dev/null
+++ b/py/probe/functions/glob_path.py
@@ -0,0 +1,31 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import glob
+import os
+
+import factory_common # pylint: disable=unused-import
+from cros.factory.probe import function
+from cros.factory.utils.arg_utils import Arg
+
+
+DEFAULT_KEY = 'path'
+
+
+class GlobPathFunction(function.ProbeFunction):
+ """Finds all the pathnames matching the pattern."""
+ ARGS = [
+ Arg('pathname', str, 'The file path of target file.'),
+ Arg('key', str, 'The key of the result.',
+ default=DEFAULT_KEY),
+ Arg('filename_only', bool, 'True to return the file name instead of the '
+ 'whole path.',
+ default=False),
+ ]
+
+ def Probe(self):
+ paths = glob.glob(self.args.pathname)
+ if self.args.filename_only:
+ paths = map(os.path.basename, paths)
+ return [{self.args.key: path} for path in paths]
diff --git a/py/probe/functions/vpd.py b/py/probe/functions/vpd.py
index 9af6d44..ed8fcc8 100644
--- a/py/probe/functions/vpd.py
+++ b/py/probe/functions/vpd.py
@@ -11,7 +11,8 @@
"""Reads the information from VPD."""

ARGS = [
- Arg('key', str, 'The field of VPD.'),
+ Arg('field', str, 'The field of VPD.'),
+ Arg('key', str, 'The key of the result.', default=None),
Arg('from_rw', bool,
'True to read from RW_VPD, and False to read from RO_VPD. '
'Default is to read from RO_VPD.', default=False),
@@ -21,5 +22,7 @@
super(VPDFunction, self).__init__(**kwargs)

partition = 'RW_VPD' if self.args.from_rw else 'RO_VPD'
- self.args.command = 'vpd -i %s -g %s' % (partition, self.args.key)
+ self.args.command = 'vpd -i %s -g %s' % (partition, self.args.field)
self.args.split_line = False
+ if self.args.key is None:
+ self.args.key = self.args.field
diff --git a/py/probe/probe_cmdline.py b/py/probe/probe_cmdline.py
index 415e1f7..bdbae18 100755
--- a/py/probe/probe_cmdline.py
+++ b/py/probe/probe_cmdline.py
@@ -226,10 +226,11 @@
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, stream=sys.stderr)

+
def Main():
options = ParseOptions()
SetRootLogger(options.verbose)
- options._Command(options)
+ options._Command(options) # pylint: disable=protected-access


if __name__ == '__main__':

To view, visit change 455599. To unsubscribe, visit settings.

Gerrit-Project: chromiumos/platform/factory
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: Ia0f28b578296c37fc3c65752d38f43024e01eac1
Gerrit-Change-Number: 455599
Gerrit-PatchSet: 8
Gerrit-Owner: Chih-Yu Huang <akah...@chromium.org>
Gerrit-Reviewer: Chih-Yu Huang <akah...@chromium.org>
Gerrit-Reviewer: Hung-Te Lin <hun...@chromium.org>
Gerrit-Reviewer: Wei-Han Chen <sti...@chromium.org>
Gerrit-Reviewer: Yong Hong <yh...@chromium.org>
Gerrit-CC: ChromeOS Commit Bot <chromeos-...@chromium.org>
Reply all
Reply to author
Forward
0 new messages