Source code for usbtmc.usbtmc

"""

Python USBTMC driver

Copyright (c) 2012-2016 Alex Forencich

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

"""

import usb.core
import usb.util
import struct
import time
import os
import re
import sys

# constants
USBTMC_bInterfaceClass    = 0xFE
USBTMC_bInterfaceSubClass = 3
USBTMC_bInterfaceProtocol = 0
USB488_bInterfaceProtocol = 1

USBTMC_MSGID_DEV_DEP_MSG_OUT            = 1
USBTMC_MSGID_REQUEST_DEV_DEP_MSG_IN     = 2
USBTMC_MSGID_DEV_DEP_MSG_IN             = 2
USBTMC_MSGID_VENDOR_SPECIFIC_OUT        = 126
USBTMC_MSGID_REQUEST_VENDOR_SPECIFIC_IN = 127
USBTMC_MSGID_VENDOR_SPECIFIC_IN         = 127
USB488_MSGID_TRIGGER                    = 128

USBTMC_STATUS_SUCCESS                  = 0x01
USBTMC_STATUS_PENDING                  = 0x02
USBTMC_STATUS_FAILED                   = 0x80
USBTMC_STATUS_TRANSFER_NOT_IN_PROGRESS = 0x81
USBTMC_STATUS_SPLIT_NOT_IN_PROGRESS    = 0x82
USBTMC_STATUS_SPLIT_IN_PROGRESS        = 0x83
USB488_STATUS_INTERRUPT_IN_BUSY        = 0x20

USBTMC_REQUEST_INITIATE_ABORT_BULK_OUT     = 1
USBTMC_REQUEST_CHECK_ABORT_BULK_OUT_STATUS = 2
USBTMC_REQUEST_INITIATE_ABORT_BULK_IN      = 3
USBTMC_REQUEST_CHECK_ABORT_BULK_IN_STATUS  = 4
USBTMC_REQUEST_INITIATE_CLEAR              = 5
USBTMC_REQUEST_CHECK_CLEAR_STATUS          = 6
USBTMC_REQUEST_GET_CAPABILITIES            = 7
USBTMC_REQUEST_INDICATOR_PULSE             = 64

USB488_READ_STATUS_BYTE = 128
USB488_REN_CONTROL      = 160
USB488_GOTO_LOCAL       = 161
USB488_LOCAL_LOCKOUT    = 162

USBTMC_HEADER_SIZE = 12

def parse_visa_resource_string(resource_string):
    # valid resource strings:
    # USB::1234::5678::INSTR
    # USB::1234::5678::SERIAL::INSTR
    # USB0::0x1234::0x5678::INSTR
    # USB0::0x1234::0x5678::SERIAL::INSTR
    m = re.match('^(?P<prefix>(?P<type>USB)\d*)(::(?P<arg1>[^\s:]+))'
        '(::(?P<arg2>[^\s:]+(\[.+\])?))(::(?P<arg3>[^\s:]+))?'
        '(::(?P<suffix>INSTR))$', resource_string, re.I)

    if m is not None:
        return dict(
            type = m.group('type').upper(),
            prefix = m.group('prefix'),
            arg1 = m.group('arg1'),
            arg2 = m.group('arg2'),
            arg3 = m.group('arg3'),
            suffix = m.group('suffix')
        )

# Exceptions
class UsbtmcException(Exception):
    em = {0:  "No error"}

    def __init__(self, err = None, note = None):
        self.err = err
        self.note = note
        self.msg = ''

        if err is None:
            self.msg = note
        else:
            if type(err) is int:
                if err in self.em:
                    self.msg = "%d: %s" % (err, self.em[err])
                else:
                    self.msg = "%d: Unknown error" % err
            else:
                self.msg = err
            if note is not None:
                self.msg = "%s [%s]" % (self.msg, note)

    def __str__(self):
        return self.msg

def list_devices():
    "List all connected USBTMC devices"

    def is_usbtmc_device(dev):
        for cfg in dev:
            d = usb.util.find_descriptor(cfg, bInterfaceClass = USBTMC_bInterfaceClass,
                                        bInterfaceSubClass = USBTMC_bInterfaceSubClass)
            is_advantest = dev.idVendor == 0x1334
            return d is not None or is_advantest

    return list(usb.core.find(find_all = True, custom_match = is_usbtmc_device))

def find_device(idVendor = None, idProduct = None, iSerial = None):
    "Find USBTMC instrument"

    devs = list_devices()

    if len(devs) == 0:
        return None

    for dev in devs:
        if dev.idVendor != idVendor or dev.idProduct != idProduct:
            continue

        if iSerial is None:
            return dev
        else:
            s = ''

            # try reading serial number
            try:
                s = dev.serial_number
            except:
                pass

            if iSerial == s:
                return dev

    return None

[docs]class Instrument(object): "USBTMC instrument interface client" def __init__(self, *args, **kwargs): "Create new USBTMC instrument object" self.idVendor = 0 self.idProduct = 0 self.iSerial = None self.device = None self.cfg = None self.iface = None self.term_char = None self.advantest_quirk = False self.advantest_locked = False self.bcdUSBTMC = 0 self.support_pulse = False self.support_talk_only = False self.support_listen_only = False self.support_term_char = False self.bcdUSB488 = 0 self.support_USB4882 = False self.support_remote_local = False self.support_trigger = False self.support_scpi = False self.support_SR = False self.support_RL = False self.support_DT = False self.max_transfer_size = 1024*1024 self.timeout = 1000 self.bulk_in_ep = None self.bulk_out_ep = None self.interrupt_in_ep = None self.last_btag = 0 self.last_rstb_btag = 0 self.connected = False self.reattach = [] self.old_cfg = None resource = None # process arguments if len(args) == 1: if type(args[0]) == str: resource = args[0] else: self.device = args[0] if len(args) >= 2: self.idVendor = args[0] self.idProduct = args[1] if len(args) >= 3: self.iSerial = args[2] for op in kwargs: val = kwargs[op] if op == 'idVendor': self.idVendor = val elif op == 'idProduct': self.idProduct = val elif op == 'iSerial': self.iSerial = val elif op == 'device': self.device = val elif op == 'dev': self.device = val elif op == 'term_char': self.term_char = val elif op == 'resource': resource = val if resource is not None: res = parse_visa_resource_string(resource) if res is None: raise UsbtmcException("Invalid resource string", 'init') if res['arg1'] is None and res['arg2'] is None: raise UsbtmcException("Invalid resource string", 'init') self.idVendor = int(res['arg1'], 0) self.idProduct = int(res['arg2'], 0) self.iSerial = res['arg3'] # find device if self.device is None: if self.idVendor is None or self.idProduct is None: raise UsbtmcException("No device specified", 'init') else: self.device = find_device(self.idVendor, self.idProduct, self.iSerial) if self.device is None: raise UsbtmcException("Device not found", 'init') def __del__(self): if self.connected: self.close() def open(self): if self.connected: return # initialize device # find first USBTMC interface for cfg in self.device: for iface in cfg: if (self.device.idVendor == 0x1334) or \ (iface.bInterfaceClass == USBTMC_bInterfaceClass and iface.bInterfaceSubClass == USBTMC_bInterfaceSubClass): self.cfg = cfg self.iface = iface break else: continue break if self.iface is None: raise UsbtmcException("Not a USBTMC device", 'init') self.old_cfg = self.device.get_active_configuration() if self.old_cfg.bConfigurationValue == self.cfg.bConfigurationValue: # already set to correct configuration # release kernel driver on USBTMC interface if os.name == 'posix': if self.device.is_kernel_driver_active(self.iface.bInterfaceNumber): self.reattach.append(self.iface.bInterfaceNumber) try: self.device.detach_kernel_driver(self.iface.bInterfaceNumber) except usb.core.USBError as e: sys.exit("Could not detatch kernel driver from interface({0}): {1}".format(self.iface.bInterfaceNumber, str(e))) else: # wrong configuration # release all kernel drivers if os.name == 'posix': for iface in self.old_cfg: if self.device.is_kernel_driver_active(iface.bInterfaceNumber): self.reattach.append(iface.bInterfaceNumber) try: self.device.detach_kernel_driver(iface.bInterfaceNumber) except usb.core.USBError as e: sys.exit("Could not detatch kernel driver from interface({0}): {1}".format(iface.bInterfaceNumber, str(e))) # set proper configuration self.device.set_configuration(self.cfg) # don't need to set altsetting - USBTMC devices have 1 altsetting as per the spec # find endpoints for ep in self.iface: ep_dir = usb.util.endpoint_direction(ep.bEndpointAddress) ep_type = usb.util.endpoint_type(ep.bmAttributes) if (ep_type == usb.util.ENDPOINT_TYPE_BULK): if (ep_dir == usb.util.ENDPOINT_IN): self.bulk_in_ep = ep elif (ep_dir == usb.util.ENDPOINT_OUT): self.bulk_out_ep = ep elif (ep_type == usb.util.ENDPOINT_TYPE_INTR): if (ep_dir == usb.util.ENDPOINT_IN): self.interrupt_in_ep = ep if self.bulk_in_ep is None or self.bulk_out_ep is None: raise UsbtmcException("Invalid endpoint configuration", 'init') # set quirk flags if necessary if self.device.idVendor == 0x1334: # Advantest/ADCMT devices have a very odd USBTMC implementation # which requires max 63 byte reads and never signals EOI on read self.max_transfer_size = 63 self.advantest_quirk = True self.connected = True self.clear() self.get_capabilities() def close(self): if not self.connected: return usb.util.dispose_resources(self.device) try: # reset configuration if self.cfg.bConfigurationValue != self.old_cfg.bConfigurationValue: self.device.set_configuration(self.old_cfg) # try to reattach kernel driver for iface in self.reattach: try: self.device.attach_kernel_driver(iface) except: pass except: pass self.reattach = [] self.connected = False def is_usb488(self): return self.iface.bInterfaceProtocol == USB488_bInterfaceProtocol def get_capabilities(self): if not self.connected: self.open() b=self.device.ctrl_transfer( usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), USBTMC_REQUEST_GET_CAPABILITIES, 0x0000, self.iface.index, 0x0018, timeout=self.timeout) if (b[0] == USBTMC_STATUS_SUCCESS): self.bcdUSBTMC = (b[3] << 8) + b[2] self.support_pulse = b[4] & 4 != 0 self.support_talk_only = b[4] & 2 != 0 self.support_listen_only = b[4] & 1 != 0 self.support_term_char = b[5] & 1 != 0 if self.is_usb488(): self.bcdUSB488 = (b[13] << 8) + b[12] self.support_USB4882 = b[4] & 4 != 0 self.support_remote_local = b[4] & 2 != 0 self.support_trigger = b[4] & 1 != 0 self.support_scpi = b[4] & 8 != 0 self.support_SR = b[4] & 4 != 0 self.support_RL = b[4] & 2 != 0 self.support_DT = b[4] & 1 != 0 else: raise UsbtmcException("Get capabilities failed", 'get_capabilities')
[docs] def pulse(self): """ Send a pulse indicator request, this should blink a light for 500-1000ms and then turn off again. (Only if supported) """ if not self.connected: self.open() if self.support_pulse: b = self.device.ctrl_transfer( usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), USBTMC_REQUEST_INDICATOR_PULSE, 0x0000, self.iface.index, 0x0001, timeout=self.timeout) if (b[0] != USBTMC_STATUS_SUCCESS): raise UsbtmcException("Pulse failed", 'pulse')
# message header management def pack_bulk_out_header(self, msgid): self.last_btag = btag = (self.last_btag % 255) + 1 return struct.pack('BBBx', msgid, btag, ~btag & 0xFF) def pack_dev_dep_msg_out_header(self, transfer_size, eom = True): hdr = self.pack_bulk_out_header(USBTMC_MSGID_DEV_DEP_MSG_OUT) return hdr+struct.pack("<LBxxx", transfer_size, eom) def pack_dev_dep_msg_in_header(self, transfer_size, term_char = None): hdr = self.pack_bulk_out_header(USBTMC_MSGID_DEV_DEP_MSG_IN) transfer_attributes = 0 if term_char is None: term_char = 0 else: transfer_attributes = 2 term_char = self.term_char return hdr+struct.pack("<LBBxx", transfer_size, transfer_attributes, term_char) def pack_vendor_specific_out_header(self, transfer_size): hdr = self.pack_bulk_out_header(USBTMC_MSGID_VENDOR_SPECIFIC_OUT) return hdr+struct.pack("<Lxxxx", transfer_size) def pack_vendor_specific_in_header(self, transfer_size): hdr = self.pack_bulk_out_header(USBTMC_MSGID_VENDOR_SPECIFIC_IN) return hdr+struct.pack("<Lxxxx", transfer_size) def pack_usb488_trigger(self): hdr = self.pack_bulk_out_header(USB488_MSGID_TRIGGER) return hdr+b'\x00'*8 def unpack_bulk_in_header(self, data): msgid, btag, btaginverse = struct.unpack_from('BBBx', data) return (msgid, btag, btaginverse) def unpack_dev_dep_resp_header(self, data): msgid, btag, btaginverse = self.unpack_bulk_in_header(data) transfer_size, transfer_attributes = struct.unpack_from('<LBxxx', data, 4) data = data[USBTMC_HEADER_SIZE:transfer_size+USBTMC_HEADER_SIZE] return (msgid, btag, btaginverse, transfer_size, transfer_attributes, data)
[docs] def write_raw(self, data): "Write binary data to instrument" if not self.connected: self.open() eom = False num = len(data) offset = 0 while num > 0: if num <= self.max_transfer_size: eom = True block = data[offset:offset+self.max_transfer_size] size = len(block) req = self.pack_dev_dep_msg_out_header(size, eom) + block + b'\0'*((4 - (size % 4)) % 4) self.bulk_out_ep.write(req) offset += size num -= size
[docs] def read_raw(self, num=-1): "Read binary data from instrument" if not self.connected: self.open() read_len = self.max_transfer_size if num > 0 and num < read_len: read_len = num eom = False term_char = None if self.term_char is not None: term_char = self.term_char read_data = b'' while not eom: req = self.pack_dev_dep_msg_in_header(read_len, term_char) self.bulk_out_ep.write(req) resp = self.bulk_in_ep.read(read_len+USBTMC_HEADER_SIZE+3, timeout = self.timeout) if sys.version_info >= (3, 0): resp = resp.tobytes() else: resp = resp.tostring() msgid, btag, btaginverse, transfer_size, transfer_attributes, data = self.unpack_dev_dep_resp_header(resp) eom = transfer_attributes & 1 read_data += data # Advantest devices never signal EOI and may only send one read packet if self.advantest_quirk: break if num > 0: num = num - len(data) if num <= 0: break if num < read_len: read_len = num return read_data
[docs] def ask_raw(self, data, num=-1): "Write then read binary data" # Advantest/ADCMT hardware won't respond to a command unless it's in Local Lockout mode was_locked = self.advantest_locked try: if self.advantest_quirk and not was_locked: self.lock() self.write_raw(data) return self.read_raw(num) finally: if self.advantest_quirk and not was_locked: self.unlock()
[docs] def write(self, message, encoding = 'utf-8'): "Write string to instrument" if type(message) is tuple or type(message) is list: # recursive call for a list of commands for message_i in message: self.write(message_i, encoding) return self.write_raw(str(message).encode(encoding))
[docs] def read(self, num=-1, encoding = 'utf-8'): "Read string from instrument" return self.read_raw(num).decode(encoding).rstrip('\r\n')
[docs] def ask(self, message, num=-1, encoding = 'utf-8'): "Write then read string" if type(message) is tuple or type(message) is list: # recursive call for a list of commands val = list() for message_i in message: val.append(self.ask(message_i, num, encoding)) return val # Advantest/ADCMT hardware won't respond to a command unless it's in Local Lockout mode was_locked = self.advantest_locked try: if self.advantest_quirk and not was_locked: self.lock() self.write(message, encoding) return self.read(num, encoding) finally: if self.advantest_quirk and not was_locked: self.unlock()
[docs] def read_stb(self): "Read status byte" if not self.connected: self.open() if self.is_usb488(): rstb_btag = (self.last_rstb_btag % 128) + 1 if rstb_btag < 2: rstb_btag = 2 self.last_rstb_btag = rstb_btag b=self.device.ctrl_transfer( usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), USB488_READ_STATUS_BYTE, rstb_btag, self.iface.index, 0x0003, timeout=self.timeout) if (b[0] == USBTMC_STATUS_SUCCESS): # check btag if rstb_btag != b[1]: raise UsbtmcException("Read status byte btag mismatch", 'read_stb') if self.interrupt_in_ep is None: # no interrupt channel, value is here return b[2] else: # read response from interrupt channel resp = self.interrupt_in_ep.read(2, timeout=self.timeout) if resp[0] != rstb_btag + 128: raise UsbtmcException("Read status byte btag mismatch", 'read_stb') else: return resp[1] else: raise UsbtmcException("Read status failed", 'read_stb') else: return int(self.ask("*STB?"))
[docs] def trigger(self): "Send trigger command" if not self.connected: self.open() if self.support_trigger: data = self.pack_usb488_trigger() print(repr(data)) self.bulk_out_ep.write(data) else: self.write("*TRG")
[docs] def clear(self): "Send clear command" if not self.connected: self.open() # Send INITIATE_CLEAR b = self.device.ctrl_transfer( usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), USBTMC_REQUEST_INITIATE_CLEAR, 0x0000, self.iface.index, 0x0001, timeout=self.timeout) if (b[0] == USBTMC_STATUS_SUCCESS): # Initiate clear succeeded, wait for completion while True: # Check status b = self.device.ctrl_transfer( usb.util.build_request_type(usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE), USBTMC_REQUEST_CHECK_CLEAR_STATUS, 0x0000, self.iface.index, 0x0002, timeout=self.timeout) if (b[0] == USBTMC_STATUS_PENDING): time.sleep(0.1) else: break # Clear halt condition self.bulk_out_ep.clear_halt() else: raise UsbtmcException("Clear failed", 'clear')
[docs] def remote(self): "Send remote command" raise NotImplementedError()
[docs] def local(self): "Send local command" raise NotImplementedError()
[docs] def lock(self): "Send lock command" if not self.connected: self.open() if self.advantest_quirk: # This Advantest/ADCMT vendor-specific control command enables remote control and must be sent before any commands are exchanged # (otherwise READ commands will only retrieve the latest measurement) self.advantest_locked = True self.device.ctrl_transfer(bmRequestType=0xA1, bRequest=0xA0, wValue=0x0001, wIndex=0x0000, data_or_wLength=1) else: raise NotImplementedError()
[docs] def unlock(self): "Send unlock command" if not self.connected: self.open() if self.advantest_quirk: # This Advantest/ADCMT vendor-specific control command enables remote control and must be sent before any commands are exchanged # (otherwise READ commands will only retrieve the latest measurement) self.advantest_locked = False self.device.ctrl_transfer(bmRequestType=0xA1, bRequest=0xA0, wValue=0x0000, wIndex=0x0000, data_or_wLength=1) else: raise NotImplementedError()
def advantest_read_myid(self): if not self.connected: self.open() "Read MyID value from Advantest and ADCMT devices" if self.advantest_quirk: # This Advantest/ADCMT vendor-specific control command reads the "MyID" identifier try: return int(self.device.ctrl_transfer(bmRequestType=0xC1, bRequest=0xF5, wValue=0x0000, wIndex=0x0000, data_or_wLength=1)[0]) except: return None else: raise NotImplementedError()