Source code for pyvisa.highlevel

# -*- coding: utf-8 -*-
"""
    pyvisa.highlevel
    ~~~~~~~~~~~~~~~~

    High level Visa library wrapper.

    This file is part of PyVISA.

    :copyright: 2014 by PyVISA Authors, see AUTHORS for more details.
    :license: MIT, see LICENSE for more details.
"""

import time
import atexit
import warnings
import contextlib
from collections import defaultdict

from . import logger
from .constants import *
from . import ctwrapper
from . import errors
from .util import (warning_context, split_kwargs, warn_for_invalid_kwargs,
                   parse_ascii, parse_binary, get_library_paths)


def add_visa_methods(wrapper_module):
    """Decorator factory to add methods in `wrapper_module.visa_functions`
    iterable to a class.

    :param wrapper_module: the python module/package that wraps the visa library.
    """
    def _internal(aclass):
        aclass._wrapper_module = wrapper_module
        methods = wrapper_module.visa_functions
        for method in methods:
            if hasattr(aclass, method):
                setattr(aclass, '_' + method, getattr(wrapper_module, method))
            else:
                setattr(aclass, method, getattr(wrapper_module, method))
        return aclass
    return _internal


@add_visa_methods(ctwrapper)
[docs]class VisaLibrary(object): """High level VISA Library wrapper. The easiest way to instantiate the library is to let `pyvisa` find the right one for you. This looks first in your configuration file (~/.pyvisarc). If it fails, it uses `ctypes.util.find_library` to try to locate a library in a way similar to what the compiler does: >>> visa_library = VisaLibrary() But you can also specify the path: >>> visa_library = VisaLibrary('/my/path/visa.so') Or use the `from_paths` constructor if you want to try multiple paths: >>> visa_library = VisaLibrary.from_paths(['/my/path/visa.so', '/maybe/this/visa.so']) :param library_path: path of the VISA library. """ #: Maps library path to VisaLibrary object _registry = dict() @classmethod
[docs] def from_paths(cls, *paths): """Helper constructor that tries to instantiate VisaLibrary from an iterable of possible library paths. """ errs = [] for path in paths: try: return cls(path) except OSError as e: logger.debug('Could not open VISA library %s: %s', path, str(e)) errs.append(str(e)) else: raise OSError('Could not open VISA library:\n' + '\n'.join(errs))
def __new__(cls, library_path=None): if library_path is None: paths = get_library_paths(cls._wrapper_module) if not paths: raise OSError('Could not found VISA library. ' 'Please install VISA or pass its location as an argument.') return cls.from_paths(*paths) else: if library_path in cls._registry: return cls._registry[library_path] cls._registry[library_path] = obj = super(VisaLibrary, cls).__new__(cls) try: obj.lib = cls._wrapper_module.Library(library_path) except OSError as exc: raise errors.LibraryError.from_exception(exc, library_path) obj.library_path = library_path logger.debug('Created library wrapper for %s', library_path) # Set the argtypes, restype and errcheck for each function # of the visa library. Additionally store in `_functions` the # name of the functions. cls._wrapper_module.set_signatures(obj.lib, errcheck=obj._return_handler) # Set the library functions as attributes of the object. for method_name in getattr(obj.lib, '_functions', []): setattr(obj, method_name, getattr(obj.lib, method_name)) #: Error codes on which to issue a warning. obj.issue_warning_on = set([VI_SUCCESS_MAX_CNT, VI_SUCCESS_DEV_NPRESENT, VI_SUCCESS_SYNC, VI_WARN_QUEUE_OVERFLOW, VI_WARN_CONFIG_NLOADED, VI_WARN_NULL_OBJECT, VI_WARN_NSUP_ATTR_STATE, VI_WARN_UNKNOWN_STATUS, VI_WARN_NSUP_BUF, VI_WARN_EXT_FUNC_NIMPL]) #: Contains all installed event handlers. #: Its elements are tuples with three elements: The handler itself (a Python #: callable), the user handle (as a ct object) and the handler again, this #: time as a ct object created with CFUNCTYPE. obj.handlers = defaultdict(list) #: Last return value of the library. obj._status = 0 #: Default ResourceManager instance for this library. obj._resource_manager = None obj._logging_extra = {'library_path': obj.library_path} return obj def __str__(self): return 'Visa Library at %s' % self.library_path def __repr__(self): return '<VisaLibrary(%r)>' % self.library_path @property
[docs] def status(self): """Last return value of the library. """ return self._status
@property
[docs] def resource_manager(self): """Default resource manager object for this library. """ if self._resource_manager is None: self._resource_manager = ResourceManager(self) return self._resource_manager
def _return_handler(self, ret_value, func, arguments): """Check return values for errors and warnings. """ logger.debug('%s%s -> %s', func.__name__, arguments, ret_value, extra=self._logging_extra) self._status = ret_value if ret_value < 0: raise errors.VisaIOError(ret_value) if ret_value in self.issue_warning_on: warnings.warn(errors.VisaIOWarning(ret_value), stacklevel=2) return ret_value
[docs] def install_handler(self, session, event_type, handler, user_handle=None): """Installs handlers for event callbacks. :param session: Unique logical identifier to a session. :param event_type: Logical event identifier. :param handler: Interpreted as a valid reference to a handler to be installed by a client application. :param user_handle: A value specified by an application that can be used for identifying handlers uniquely for an event type. :returns: user handle (a ctypes object) """ try: new_handler = self._install_handler(self.lib, session, event_type, handler, user_handle) except TypeError as e: raise errors.VisaTypeError(str(e)) self.handlers[session].append(new_handler) return new_handler[1]
[docs] def uninstall_handler(self, session, event_type, handler, user_handle=None): """Uninstalls handlers for events. :param session: Unique logical identifier to a session. :param event_type: Logical event identifier. :param handler: Interpreted as a valid reference to a handler to be uninstalled by a client application. :param user_handle: A value specified by an application that can be used for identifying handlers uniquely in a session for an event. """ for ndx, element in enumerate(self.handlers[session]): if element[0] is handler and element[1] is user_handle: del self.handlers[session][ndx] break else: raise errors.UnknownHandler(event_type, handler, user_handle) self._uninstall_handler(self.lib, session, event_type, handler, user_handle)
[docs]class ResourceManager(object): """VISA Resource Manager :param visa_library: VisaLibrary Instance or path of the VISA library (if not given, the default for the platform will be used). """ #: Maps VisaLibrary instance to ResourceManager _registry = dict() def __new__(cls, visa_library=None): if visa_library is None or isinstance(visa_library, str): visa_library = VisaLibrary(visa_library) if visa_library in cls._registry: return cls._registry[visa_library] cls._registry[visa_library] = obj = super(ResourceManager, cls).__new__(cls) obj.visalib = visa_library obj.session = obj.visalib.open_default_resource_manager() logger.debug('Created ResourceManager with session %s', obj.session) return obj def __str__(self): return 'Resource Manager of %s' % self.visalib def __repr__(self): return '<ResourceManager(%r)>' % self.visalib def __del__(self): self.close() def close(self): if self.session is not None: logger.debug('Closing ResourceManager (session: %s)', self.session) self.visalib.close(self.session) self.session = None
[docs] def list_resources(self, query='?*::INSTR'): """Returns a tuple of all connected devices matching query. :param query: regular expression used to match devices. """ lib = self.visalib resources = [] find_list, return_counter, instrument_description = lib.find_resources(self.session, query) resources.append(instrument_description) for i in range(return_counter - 1): resources.append(lib.find_next(find_list)) return tuple(resource for resource in resources)
[docs] def list_resources_info(self, query='?*::INSTR'): """Returns a dictionary mapping resource names to resource extended information of all connected devices matching query. :param query: regular expression used to match devices. :return: Mapping of resource name to ResourceInfo :rtype: dict """ return dict((resource, self.resource_info(resource)) for resource in self.list_resources(query))
[docs] def resource_info(self, resource_name): """Get the extended information of a particular resource :param resource_name: Unique symbolic name of a resource. :rtype: ResourceInfo """ return self.visalib.parse_resource_extended(self.session, resource_name)
[docs] def open_resource(self, resource_name, access_mode=VI_NO_LOCK, open_timeout=VI_TMO_IMMEDIATE): """Open the specified resources. :param resource_name: name or alias of the resource to open. :param access_mode: access mode. :param open_timeout: time out to open. :return: Unique logical identifier reference to a session. """ return self.visalib.open(self.session, resource_name, access_mode, open_timeout)
[docs] def get_instrument(self, resource_name, **kwargs): """Return an instrument for the resource name. :param resource_name: name or alias of the resource to open. :param kwargs: keyword arguments to be passed to the instrument constructor. """ interface_type = self.resource_info(resource_name).interface_type if interface_type == VI_INTF_GPIB: return GpibInstrument(resource_name, resource_manager=self, **kwargs) elif interface_type == VI_INTF_ASRL: return SerialInstrument(resource_name, resource_manager=self, **kwargs) else: return Instrument(resource_name, resource_manager=self, **kwargs)
class _BaseInstrument(object): """Base class for instruments. :param resource_name: the VISA name for the resource (eg. "GPIB::10") If None, it's assumed that the resource manager is to be constructed. :param resource_manager: A resource manager instance. If None, the default resource manager will be used. :param lock: :param timeout: See :class:Instrument for a detailed description. """ DEFAULT_KWARGS = {'lock': VI_NO_LOCK, 'timeout': 5} def __init__(self, resource_name=None, resource_manager=None, **kwargs): warn_for_invalid_kwargs(kwargs, _BaseInstrument.DEFAULT_KWARGS.keys()) self.resource_manager = resource_manager or get_resource_manager() self.visalib = self.resource_manager.visalib self._resource_name = resource_name self._logging_extra = {'library_path': self.visalib.library_path, 'resource_manager.session': self.resource_manager.session, 'resource_name': self._resource_name, 'session': None} self.open(kwargs.get('lock', _BaseInstrument.DEFAULT_KWARGS['lock'])) for key, value in _BaseInstrument.DEFAULT_KWARGS.items(): setattr(self, key, kwargs.get(key, value)) def open(self, lock=None, timeout=5): """Opens a session to the specified resource. :param lock: Specifies the mode by which the resource is to be accessed. Valid values: VI_NO_LOCK, VI_EXCLUSIVE_LOCK, VI_SHARED_LOCK :param timeout: Specifies the maximum time period (in milliseconds) that this operation waits before returning an error. """ lock = self.lock if lock is None else lock logger.debug('%s - opening ...', self._resource_name, extra=self._logging_extra) with warning_context("ignore", "VI_SUCCESS_DEV_NPRESENT"): self.session = self.resource_manager.open_resource(self._resource_name, lock) if self.visalib.status == VI_SUCCESS_DEV_NPRESENT: # okay, the device was not ready when we opened the session. # Now it gets five seconds more to become ready. # Every 0.1 seconds we probe it with viClear. start_time = time.time() sleep_time = 0.1 try_time = 5 while time.time() - start_time < try_time: time.sleep(sleep_time) try: self.clear() break except errors.VisaIOError as error: if error.error_code != VI_ERROR_NLISTENERS: raise if timeout is None: self.set_visa_attribute(VI_ATTR_TMO_VALUE, VI_TMO_INFINITE) else: self.timeout = timeout self._logging_extra['session'] = self.session logger.debug('%s - is open with session %s', self._resource_name, self.session, extra=self._logging_extra) def close(self): """Closes the VISA session and marks the handle as invalid. """ if self.resource_manager.session is None or self.session is None: return logger.debug('%s - closing', self._resource_name, extra=self._logging_extra) self.visalib.close(self.session) logger.debug('%s - is closed', self._resource_name, extra=self._logging_extra) self.session = None def __del__(self): self.close() def __str__(self): return "%s at %s" % (self.__class__.__name__, self.resource_name) def __repr__(self): return "<%r(%r)>" % (self.__class__.__name__, self.resource_name) def get_visa_attribute(self, name): return self.visalib.get_attribute(self.session, name) def set_visa_attribute(self, name, status): self.visalib.set_attribute(self.session, name, status) def clear(self): self.visalib.clear(self.session) @property def timeout(self): """The timeout in seconds for all resource I/O operations. Note that the VISA library may round up this value heavily. I experienced that my NI VISA implementation had only the values 0, 1, 3 and 10 seconds. """ timeout = self.get_visa_attribute(VI_ATTR_TMO_VALUE) if timeout == VI_TMO_INFINITE: raise NameError("no timeout is specified") return timeout / 1000.0 @timeout.setter def timeout(self, timeout): if not (0 <= timeout <= 4294967): raise ValueError("timeout value is invalid") self.set_visa_attribute(VI_ATTR_TMO_VALUE, int(timeout * 1000)) @timeout.deleter def timeout(self): timeout = self.timeout # just to test whether it's defined self.set_visa_attribute(VI_ATTR_TMO_VALUE, VI_TMO_INFINITE) @property def resource_class(self): """The resource class of the resource as a string. """ # TODO: Check possible outputs. try: return self.get_visa_attribute(VI_ATTR_RSRC_CLASS).upper() except errors.VisaIOError as error: if error.error_code != VI_ERROR_NSUP_ATTR: raise return 'Unknown' @property def resource_name(self): """The VISA resource name of the resource as a string. """ return self.get_visa_attribute(VI_ATTR_RSRC_NAME) @property def interface_type(self): """The interface type of the resource as a number. """ return self.visalib.parse_resource(self.resource_manager.session, self.resource_name).interface_type @contextlib.contextmanager def read_termination_context(self, new_termination): term = self.get_visa_attribute(VI_ATTR_TERMCHAR) self.set_visa_attribute(VI_ATTR_TERMCHAR, ord(new_termination[-1])) yield self.set_visa_attribute(VI_ATTR_TERMCHAR, term) # The bits in the bitfield mean the following: # # bit number if set / if not set # 0 binary/ascii # 1 double/single (IEEE floating point) # 2 big-endian/little-endian # # This leads to the following constants: ascii = 0 single = 1 double = 3 big_endian = 4 CR = '\r' LF = '\n'
[docs]class Instrument(_BaseInstrument): """Class for all kinds of Instruments. It can be instantiated, however, if you want to use special features of a certain interface system (GPIB, USB, RS232, etc), you must instantiate one of its child classes. :param resource_name: the instrument's resource name or an alias, may be taken from the list from `list_resources` method from a ResourceManager. :param timeout: the VISA timeout for each low-level operation in milliseconds. :param term_chars: the termination characters for this device. :param chunk_size: size of data packets in bytes that are read from the device. :param lock: whether you want to have exclusive access to the device. Default: VI_NO_LOCK :param ask_delay: waiting time in seconds after each write command. Default: 0.0 :param send_end: whether to assert end line after each write command. Default: True :param values_format: floating point data value format. Default: ascii (0) """ #: Termination character sequence (Legacy, to be removed in 1.6). __term_chars = None DEFAULT_KWARGS = {'read_termination': None, 'write_termination': CR + LF, #: How many bytes are read per low-level call. 'chunk_size': 20 * 1024, #: Seconds to wait between write and read operations inside ask. 'ask_delay': 0.0, 'send_end': True, #: floating point data value format 'values_format': ascii, #: encoding of the messages 'encoding': 'ascii'} ALL_KWARGS = dict(DEFAULT_KWARGS, **_BaseInstrument.DEFAULT_KWARGS) def __init__(self, resource_name, resource_manager=None, **kwargs): skwargs, pkwargs = split_kwargs(kwargs, Instrument.DEFAULT_KWARGS.keys(), _BaseInstrument.DEFAULT_KWARGS.keys()) self._read_termination = None self._write_termination = None if 'term_chars' in kwargs: kwargs['read_termination'] = kwargs['term_chars'] kwargs['write_termination'] = kwargs['term_chars'] super(Instrument, self).__init__(resource_name, resource_manager, **pkwargs) for key, value in Instrument.DEFAULT_KWARGS.items(): setattr(self, key, skwargs.get(key, value)) if not self.resource_class: warnings.warn("resource class of instrument could not be determined", stacklevel=2) elif self.resource_class not in ("INSTR", "RAW", "SOCKET"): warnings.warn("given resource was not an INSTR but %s" % self.resource_class, stacklevel=2) @property def encoding(self): """Encoding used for read and write operations. """ return self._encoding @encoding.setter
[docs] def encoding(self, encoding): _ = 'test encoding'.encode(encoding).decode(encoding) self._encoding = encoding
@property def read_termination(self): """Read termination character. """ return self._read_termination @read_termination.setter
[docs] def read_termination(self, value): if value: # termination character, the rest is just used for verification after # each read operation. last_char = value[-1:] # Consequently, it's illogical to have the real termination character # twice in the sequence (otherwise reading would stop prematurely). if last_char in value[:-1]: raise ValueError("ambiguous ending in termination characters") self.set_visa_attribute(VI_ATTR_TERMCHAR, ord(last_char)) self.set_visa_attribute(VI_ATTR_TERMCHAR_EN, VI_TRUE) else: self.set_visa_attribute(VI_ATTR_TERMCHAR_EN, VI_FALSE) self._read_termination = value
@property def write_termination(self): """Writer termination character. """ return self._write_termination @write_termination.setter
[docs] def write_termination(self, value): self._write_termination = value
[docs] def write_raw(self, message): """Write a string message to the device. The term_chars are appended to it, unless they are already. :param message: the message to be sent. :type message: bytes :return: number of bytes written. :rtype: int """ return self.visalib.write(self.session, message)
[docs] def write(self, message, termination=None, encoding=None): """Write a string message to the device. The term_chars are appended to it, unless they are already. :param message: the message to be sent. :type message: unicode (Py2) or str (Py3) :return: number of bytes written. :rtype: int """ term = self._write_termination if termination is None else termination enco = self._encoding if encoding is None else encoding if term: if message.endswith(term): warnings.warn("write message already ends with " "termination characters", stacklevel=2) message += term count = self.write_raw(message.encode(enco)) return count
[docs] def read_raw(self, size=None): """Read the unmodified string sent from the instrument to the computer. In contrast to read(), no termination characters are stripped. :rtype: bytes """ size = self.chunk_size if size is None else size ret = bytes() with warning_context("ignore", "VI_SUCCESS_MAX_CNT"): try: status = VI_SUCCESS_MAX_CNT while status == VI_SUCCESS_MAX_CNT: logger.debug('%s - reading %d bytes (last status %r)', self._resource_name, size, status) ret += self.visalib.read(self.session, size) status = self.visalib.status except errors.VisaIOError as e: logger.debug('%s - exception while reading: %s', self._resource_name, e) raise return ret
[docs] def read(self, termination=None, encoding=None): """Read a string from the device. Reading stops when the device stops sending (e.g. by setting appropriate bus lines), or the termination characters sequence was detected. Attention: Only the last character of the termination characters is really used to stop reading, however, the whole sequence is compared to the ending of the read string message. If they don't match, a warning is issued. All line-ending characters are stripped from the end of the string. :rtype: str """ enco = self._encoding if encoding is None else encoding if termination is None: termination = self._read_termination message = self.read_raw().decode(enco) else: with self.read_termination_context(termination): message = self.read_raw().decode(enco) if not termination: return message if not message.endswith(termination): warnings.warn("read string doesn't end with " "termination characters", stacklevel=2) if self.__term_chars is None: return message.rstrip(CR + LF) return message[:-len(termination)]
[docs] def read_values(self, fmt=None): """Read a list of floating point values from the device. :param fmt: the format of the values. If given, it overrides the class attribute "values_format". Possible values are bitwise disjunctions of the above constants ascii, single, double, and big_endian. Default is ascii. :return: the list of read values :rtype: list """ if not fmt: fmt = self.values_format if fmt & 0x01 == ascii: return parse_ascii(self.read()) data = self.read_raw() try: if fmt & 0x03 == single: is_single = True elif fmt & 0x03 == double: is_single = False else: raise ValueError("unknown data values fmt requested") return parse_binary(data, fmt & 0x04 == big_endian, is_single) except ValueError as e: raise errors.InvalidBinaryFormat(e.args)
[docs] def ask(self, message, delay=None): """A combination of write(message) and read() :param message: the message to send. :type message: str :param delay: delay in seconds between write and read operations. if None, defaults to self.ask_delay :returns: the answer from the device. :rtype: str """ self.write(message) if delay is None: delay = self.ask_delay if delay > 0.0: time.sleep(delay) return self.read()
[docs] def ask_for_values(self, message, format=None, delay=None): """A combination of write(message) and read_values() :param message: the message to send. :type message: str :param delay: delay in seconds between write and read operations. if None, defaults to self.ask_delay :returns: the answer from the device. :rtype: list """ self.write(message) if delay is None: delay = self.ask_delay if delay > 0.0: time.sleep(delay) return self.read_values(format)
[docs] def trigger(self): """Sends a software trigger to the device. """ self.set_visa_attribute(VI_ATTR_TRIG_ID, VI_TRIG_SW) self.visalib.assert_trigger(self.session, VI_TRIG_PROT_DEFAULT)
@property def term_chars(self): """Set or read a new termination character sequence (property). Normally, you just give the new termination sequence, which is appended to each write operation (unless it's already there), and expected as the ending mark during each read operation. A typical example is CR+LF or just CR. If you assign "" to this property, the termination sequence is deleted. The default is None, which means that CR + LF is appended to each write operation but not expected after each read operation (but stripped if present). """ return self.__term_chars @term_chars.setter def term_chars(self, term_chars): # First, reset termination characters, in case something bad happens. self.__term_chars = "" if term_chars == "" or term_chars is None: self.read_termination = None self.write_termination = CR + LF self.__term_chars = None return # Only the last character in term_chars is the real low-level self.read_termination = term_chars self.write_termination = term_chars @term_chars.deleter
[docs] def term_chars(self): self.term_chars = None
@property def send_end(self): """Whether or not to assert EOI (or something equivalent after each write operation. """ return self.get_visa_attribute(VI_ATTR_SEND_END_EN) == VI_TRUE @send_end.setter
[docs] def send_end(self, send): self.set_visa_attribute(VI_ATTR_SEND_END_EN, VI_TRUE if send else VI_FALSE)
[docs]class GpibInstrument(Instrument): """Class for GPIB instruments. This class extents the Instrument class with special operations and properties of GPIB instruments. :param gpib_identifier: strings are interpreted as instrument's VISA resource name. Numbers are interpreted as GPIB number. :param board_number: the number of the GPIB bus. Further keyword arguments are passed to the constructor of class Instrument. """ def __init__(self, gpib_identifier, board_number=0, resource_manager=None, **keyw): warn_for_invalid_kwargs(keyw, Instrument.ALL_KWARGS.keys()) if isinstance(gpib_identifier, int): resource_name = "GPIB%d::%d" % (board_number, gpib_identifier) else: resource_name = gpib_identifier super(GpibInstrument, self).__init__(resource_name, resource_manager, **keyw) # Now check whether the instrument is really valid if self.interface_type != VI_INTF_GPIB: raise ValueError("device is not a GPIB instrument") self.visalib.enable_event(self.session, VI_EVENT_SERVICE_REQ, VI_QUEUE) def __del__(self): if self.session is not None: self.__switch_events_off() super(GpibInstrument, self).__del__() def __switch_events_off(self): self.visalib.disable_event(self.session, VI_ALL_ENABLED_EVENTS, VI_ALL_MECH) self.visalib.discard_events(self.session, VI_ALL_ENABLED_EVENTS, VI_ALL_MECH)
[docs] def wait_for_srq(self, timeout=25): """Wait for a serial request (SRQ) coming from the instrument. Note that this method is not ended when *another* instrument signals an SRQ, only *this* instrument. :param timeout: the maximum waiting time in seconds. Defaul: 25 (seconds). None means waiting forever if necessary. """ lib = self.visalib lib.enable_event(self.session, VI_EVENT_SERVICE_REQ, VI_QUEUE) if timeout and not(0 <= timeout <= 4294967): raise ValueError("timeout value is invalid") starting_time = time.clock() while True: if timeout is None: adjusted_timeout = VI_TMO_INFINITE else: adjusted_timeout = int((starting_time + timeout - time.clock()) * 1000) if adjusted_timeout < 0: adjusted_timeout = 0 event_type, context = lib.wait_on_event(self.session, VI_EVENT_SERVICE_REQ, adjusted_timeout) lib.close(context) if self.stb & 0x40: break lib.discard_events(self.session, VI_EVENT_SERVICE_REQ, VI_QUEUE)
@property
[docs] def stb(self): """Service request status register.""" return self.visalib.read_stb(self.session) # The following aliases are used for the "end_input" property
no_end_input = VI_ASRL_END_NONE last_bit_end_input = VI_ASRL_END_LAST_BIT term_chars_end_input = VI_ASRL_END_TERMCHAR # The following aliases are used for the "parity" property no_parity = VI_ASRL_PAR_NONE odd_parity = VI_ASRL_PAR_ODD even_parity = VI_ASRL_PAR_EVEN mark_parity = VI_ASRL_PAR_MARK space_parity = VI_ASRL_PAR_SPACE
[docs]class SerialInstrument(Instrument): """Class for serial (RS232 or parallel port) instruments. Not USB! This class extents the Instrument class with special operations and properties of serial instruments. :param resource_name: the instrument's resource name or an alias, may be taken from the list from `list_resources` method from a ResourceManager. Further keyword arguments are passed to the constructor of class Instrument. """ DEFAULT_KWARGS = {'baud_rate': 9600, 'data_bits': 8, 'stop_bits': 1, 'parity': no_parity, 'end_input': term_chars_end_input} def __init__(self, resource_name, resource_manager=None, **keyw): skwargs, pkwargs = split_kwargs(keyw, SerialInstrument.DEFAULT_KWARGS.keys(), Instrument.ALL_KWARGS.keys()) pkwargs.setdefault("read_termination", CR) pkwargs.setdefault("write_termination", CR) super(SerialInstrument, self).__init__(resource_name, resource_manager, **pkwargs) # Now check whether the instrument is really valid if self.interface_type != VI_INTF_ASRL: raise ValueError("device is not a serial instrument") for key, value in SerialInstrument.DEFAULT_KWARGS.items(): setattr(self, key, skwargs.get(key, value)) @property def baud_rate(self): """The baud rate of the serial instrument. """ return self.get_visa_attribute(VI_ATTR_ASRL_BAUD) @baud_rate.setter
[docs] def baud_rate(self, rate): self.set_visa_attribute(VI_ATTR_ASRL_BAUD, rate)
@property def data_bits(self): """Number of data bits contained in each frame (from 5 to 8). """ return self.get_visa_attribute(VI_ATTR_ASRL_DATA_BITS) @data_bits.setter
[docs] def data_bits(self, bits): if not 5 <= bits <= 8: raise ValueError("number of data bits must be from 5 to 8") self.set_visa_attribute(VI_ATTR_ASRL_DATA_BITS, bits)
@property def stop_bits(self): """Number of stop bits contained in each frame (1, 1.5, or 2). """ deci_bits = self.get_visa_attribute(VI_ATTR_ASRL_STOP_BITS) if deci_bits == 10: return 1 elif deci_bits == 15: return 1.5 elif deci_bits == 20: return 2 @stop_bits.setter
[docs] def stop_bits(self, bits): deci_bits = 10 * bits if 9 < deci_bits < 11: deci_bits = 10 elif 14 < deci_bits < 16: deci_bits = 15 elif 19 < deci_bits < 21: deci_bits = 20 else: raise ValueError("invalid number of stop bits") self.set_visa_attribute(VI_ATTR_ASRL_STOP_BITS, deci_bits)
@property def parity(self): """The parity used with every frame transmitted and received.""" return self.get_visa_attribute(VI_ATTR_ASRL_PARITY) @parity.setter
[docs] def parity(self, parity): self.set_visa_attribute(VI_ATTR_ASRL_PARITY, parity)
@property def end_input(self): """indicates the method used to terminate read operations""" return self.get_visa_attribute(VI_ATTR_ASRL_END_IN) @end_input.setter
[docs] def end_input(self, termination): self.set_visa_attribute(VI_ATTR_ASRL_END_IN, termination)
def get_instruments_list(use_aliases=True): """Get a list of all connected devices. This function is kept for backwards compatibility with PyVISA < 1.5. Use:: >>> rm = ResourceManager() >>> rm.list_resources() or:: >>> rm = ResourceManager() >>> rm.list_resources_info() in the future. :param use_aliases: if True (default), return the device alias if it has one. Otherwise, always return the standard resource name like "GPIB::10". :return: A list of strings with the names of all connected devices, ready for being used to open each of them. """ if use_aliases: return [info.alias or resource_name for resource_name, info in get_resource_manager().list_resources_info().items()] return get_resource_manager().list_resources() def instrument(resource_name, **kwargs): """Factory function for instrument instances. :param resource_name: the VISA resource name of the device. It may be an alias. :param kwargs: keyword argument for the class constructor of the device instance to be generated. See the class Instrument for further information. :return: The generated instrument instance. """ return get_resource_manager().get_instrument(resource_name, **kwargs) resource_manager = None def get_resource_manager(): global resource_manager if resource_manager is None: resource_manager = ResourceManager() atexit.register(resource_manager.__del__) return resource_manager