Conversation
| # Dictionary to map sessions to their event handlers | ||
| # Structure: { session_id: { event_type: [ (handler, user_handle), ... ] } } | ||
| self.handlers = {} |
There was a problem hiding this comment.
This does not need to live here and can simply exist on each session.
| def enable_event(self, session, event_type, mechanism, context=None): | ||
| """Enables notification for an event.""" | ||
|
|
||
| # 1. Retrieve the actual session object (e.g., USBSession, TCPIPSession) | ||
| # In pyvisa-py, sessions are usually stored in self.sessions | ||
| obj = self.sessions[session] | ||
|
|
||
| # 2. Start a listener thread for this specific event/session | ||
| if event_type == constants.VI_EVENT_SERVICE_REQ: | ||
| # This is a hypothetical method you must add to the specific session classes | ||
| # (e.g., in pyvisa_py/usb.py or pyvisa_py/tcpip.py) | ||
| if hasattr(obj, "start_srq_listener"): | ||
| obj.start_srq_listener(callback=self._dispatch_event) | ||
| else: | ||
| return constants.VI_ERROR_NSUP_OPER | ||
| return constants.VI_SUCCESS |
There was a problem hiding this comment.
Rather than testing for specific methods on the session object. I would expect to have a method enable event on the session which is not implemented by default (i.e. return constants.VI_ERROR_NSUP_OPER, while logging a nicer message)
| def _dispatch_event(self, session_id, event_type): | ||
| """Internal method called by the listener thread when hardware triggers.""" | ||
| if session_id in self.handlers and event_type in self.handlers[session_id]: | ||
| for handler, user_handle in self.handlers[session_id][event_type]: | ||
| # Call the user's registered function | ||
| # Note: VISA handlers require specific arguments: (session, event_type, event_context, user_handle) | ||
| # You may need to create a dummy event_context object here | ||
| handler(session_id, event_type, None, user_handle) | ||
|
|
There was a problem hiding this comment.
This housl be fully handled within the session
| import threading | ||
|
|
There was a problem hiding this comment.
I do not believe this will be needed at the library level.
|
|
||
| def install_handler(self, session, event_type, handler, user_handle=None): | ||
| """Stores the handler for a specific session and event type.""" | ||
| try: | ||
| # Ensure the session entry exists | ||
| if session not in self.handlers: | ||
| self.handlers[session] = {} | ||
|
|
||
| # Ensure the event_type entry exists | ||
| if event_type not in self.handlers[session]: | ||
| self.handlers[session][event_type] = [] | ||
|
|
||
| # Store the handler and the user_handle | ||
| self.handlers[session][event_type].append((handler, user_handle)) | ||
|
|
||
| return constants.VI_SUCCESS # TODO: use existing status codes? | ||
| except Exception as e: | ||
| return constants.VI_ERROR_SYSTEM_ERROR # TODO: use existing status codes? | ||
|
|
||
| def uninstall_handler(self, session, event_type, handler, user_handle=None): | ||
| """ | ||
| Removes the callback handler for the specified session and event type. | ||
| """ | ||
| try: | ||
| # 1. Check if the session and event type exist in the registry | ||
| if session not in self.handlers or event_type not in self.handlers[session]: | ||
| # Standard VISA behavior is to return success if the handler | ||
| # effectively isn't there, or a warning. | ||
| return constants.VI_SUCCESS | ||
|
|
||
| # 2. Filter out the specific handler to be removed | ||
| # We keep only the entries that DO NOT match the (handler, user_handle) pair. | ||
| # This handles the case where multiple different handlers are registered for the same event. | ||
| original_list = self.handlers[session][event_type] | ||
|
|
||
| # Check if the user passed a wildcard (Generic VISA behavior) | ||
| # If handler is generic, we might want to clear all, but usually | ||
| # users uninstall specific functions. | ||
| new_list = [ | ||
| (h, uh) | ||
| for (h, uh) in original_list | ||
| if not (h == handler and uh == user_handle) | ||
| ] | ||
|
|
||
| # If the list length didn't change, the handler wasn't found. | ||
| if len(new_list) == len(original_list): | ||
| return constants.VI_WARN_UNKNOWN_STATUS | ||
|
|
||
| # 3. Update the registry | ||
| self.handlers[session][event_type] = new_list | ||
|
|
||
| # 4. Cleanup (Optional optimization) | ||
| # If no handlers remain for this event type, we can delete the key. | ||
| if not new_list: | ||
| del self.handlers[session][event_type] | ||
|
|
||
| # IMPROVEMENT: If you want to automatically stop the TCP listener thread | ||
| # to save resources when the last handler is removed, you would | ||
| # call disable_event here. | ||
| # self.disable_event(session, event_type, mechanism) | ||
|
|
||
| return constants.VI_SUCCESS | ||
|
|
||
| except Exception: | ||
| return constants.VI_ERROR_SYSTEM_ERROR |
There was a problem hiding this comment.
This should be delegated to the session.
| import struct | ||
| import threading | ||
|
|
||
| # Constants for Parsing | ||
| DEVICE_INTR_SRQ_PROC = 1 | ||
| RPC_MSG_CALL = 0 | ||
|
|
||
| # VXI-11 Constants | ||
| DEVICE_CORE_PROG = 0x0607B0 | ||
| DEVICE_CORE_VERS = 1 | ||
| CREATE_INTR_CHAN_PROC = 12 | ||
| DEVICE_INTR_PROG = 0x0607AF # The ID the device uses to call us back TODO: does this need to be dynamically assigned? | ||
| DEVICE_INTR_VERS = 1 | ||
| IPPROTO_TCP = 6 | ||
|
|
There was a problem hiding this comment.
This should come from the vxi11 module (or be defined there if those definition do not exist there).
| def pack_uint(n): | ||
| """Pack a 32-bit unsigned integer (Big Endian).""" | ||
| return struct.pack(">I", n) | ||
|
|
||
|
|
||
| def pack_string(s): | ||
| """Pack a variable length string (XDR format: Length + Data + Padding).""" | ||
| if isinstance(s, str): | ||
| s = s.encode("ascii") | ||
| length = len(s) | ||
| padding = (4 - (length % 4)) % 4 | ||
| return struct.pack(">I", length) + s + (b"\x00" * padding) | ||
|
|
||
|
|
||
| def build_create_intr_chan_packet(host_ip, host_port, xid=None): | ||
| """ | ||
| Constructs the VXI-11 'create_intr_chan' RPC packet. | ||
|
|
||
| Args: | ||
| host_ip (str): The IP address of YOUR computer (where the device should connect). | ||
| host_port (int): The port YOUR computer is listening on. | ||
| """ | ||
| if xid is None: | ||
| xid = random.randint(1, 0xFFFFFFFF) | ||
|
|
||
| # --- 1. RPC Header --- | ||
| # Field: [XID] [MsgType:Call=0] [RPCVers:2] [Prog:Core] [Vers:1] [Proc:CreateIntr] | ||
| header = ( | ||
| pack_uint(xid) # Transaction ID | ||
| + pack_uint(0) # Message Type (0 = Call) | ||
| + pack_uint(2) # RPC Version (2) | ||
| + pack_uint(DEVICE_CORE_PROG) # Program (Device Core) | ||
| + pack_uint(DEVICE_CORE_VERS) # Program Version | ||
| + pack_uint(CREATE_INTR_CHAN_PROC) # Procedure (12 = create_intr_chan) | ||
| ) | ||
|
|
||
| # --- 2. Authentication (None) --- | ||
| # Field: [Cred Flavor:0] [Cred Len:0] [Verf Flavor:0] [Verf Len:0] | ||
| auth = ( | ||
| pack_uint(0) | ||
| + pack_uint(0) # Credentials (Auth_None) | ||
| + pack_uint(0) | ||
| + pack_uint(0) # Verifier (Auth_None) | ||
| ) | ||
|
|
||
| # --- 3. Arguments --- | ||
| # The arguments defined in VXI-11 spec for create_intr_chan: | ||
| # (host_addr, host_port, prog_num, prog_vers, prog_prot) | ||
| args = ( | ||
| pack_string(host_ip) # arg1: Host Address (String) | ||
| + pack_uint(host_port) # arg2: Host Port (Uint) | ||
| + pack_uint(DEVICE_INTR_PROG) # arg3: Program Number (0x0607AF) | ||
| + pack_uint(DEVICE_INTR_VERS) # arg4: Program Version (1) | ||
| + pack_uint(IPPROTO_TCP) # arg5: Protocol (6 = TCP) | ||
| ) | ||
|
|
||
| # --- 4. Final Framing (Record Marking) --- | ||
| # ONC RPC over TCP requires a 4-byte fragment header. | ||
| # The top bit (0x80000000) indicates this is the "Last Fragment". | ||
| payload = header + auth + args | ||
| fragment_header = 0x80000000 | len(payload) | ||
|
|
||
| return struct.pack(">I", fragment_header) + payload | ||
|
|
||
|
|
There was a problem hiding this comment.
Everything vxi11 specific should be in the vxi11 module.
| # We pass the Session ID, Event Type, and the STB | ||
| # Note: PyVISA handlers usually expect (session, event_type, context, user_handle) | ||
| # We can pass the STB inside a context object or wrap it if needed. | ||
| callback( |
There was a problem hiding this comment.
The callback should be stored on the session by the sessions install handler method.
| def _send_create_intr_chan_rpc(self, host_ip, host_port): | ||
| """Sends the raw RPC packet to the device.""" | ||
|
|
||
| # 1. Build the packet | ||
| packet = build_create_intr_chan_packet(host_ip, host_port) | ||
|
|
||
| # 2. Send it over the EXISTING connection to the device | ||
| # self.interface is usually the socket object in pyvisa-py TCPIPSession | ||
| # Note: You might need to check how pyvisa-py stores the socket. | ||
| # It is often in self.visalib.sessions[self.session].interface | ||
|
|
||
| # Assuming 'self.interface' is the active socket to the instrument: | ||
| self.interface.sendall(packet) | ||
|
|
||
| # 3. Read the RPC Reply (Important!) | ||
| # We need to ensure the device accepted it. | ||
| # Read the 4-byte fragment header first | ||
| header_data = self.interface.recv(4) | ||
| frag_len = struct.unpack(">I", header_data)[0] & 0x7FFFFFFF | ||
|
|
||
| # Read the rest of the reply | ||
| reply_data = self.interface.recv(frag_len) | ||
|
|
||
| # 4. Parse Reply (Simplified) | ||
| # Skip XID(4), MsgType(4), ReplyState(4), Verifier(8), AcceptStatus(4) | ||
| # We just want to check if AcceptStatus is 0 (SUCCESS) | ||
| # This is at offset 24 (4+4+4+8) | ||
| accept_status = struct.unpack(">I", reply_data[24:28])[0] | ||
|
|
||
| if accept_status != 0: | ||
| return -1 # RPC Error | ||
|
|
||
| # The actual return value of create_intr_chan is an "Error" object (integer) | ||
| # It is usually the last 4 bytes of the packet | ||
| error_code = struct.unpack(">I", reply_data[-4:])[0] | ||
|
|
||
| return error_code |
There was a problem hiding this comment.
This can probably take advantage of the Packer existing in the vxi11 module.
| def _validate_and_handle_srq(self, conn): | ||
| """ | ||
| Reads the incoming RPC packet, validates it is an SRQ, | ||
| extracts the Status Byte, and sends a Success Reply. | ||
|
|
||
| Returns: | ||
| status_byte (int) if valid SRQ, None otherwise. | ||
| """ | ||
| # 1. Read the Fragment Header (4 bytes) | ||
| header_data = conn.recv(4) | ||
| if not header_data: | ||
| return None | ||
|
|
||
| # The last 31 bits are the length | ||
| frag_len = struct.unpack(">I", header_data)[0] & 0x7FFFFFFF | ||
|
|
||
| # 2. Read the RPC Packet | ||
| data = conn.recv(frag_len) | ||
|
|
||
| if len(data) < 24: | ||
| return None # Too short to be a valid RPC Call | ||
|
|
||
| # 3. Unpack Key Fields | ||
| # Struct: [XID:4] [MsgType:4] [RPCVer:4] [Prog:4] [ProgVer:4] [Proc:4] | ||
| xid, msg_type, rpc_vers, prog, prog_vers, proc = struct.unpack( | ||
| ">IIIIII", data[:24] | ||
| ) | ||
|
|
||
| # 4. Validate Context | ||
| if ( | ||
| msg_type == RPC_MSG_CALL | ||
| and prog == DEVICE_INTR_PROG | ||
| and proc == DEVICE_INTR_SRQ_PROC | ||
| ): | ||
| # This IS a valid SRQ! | ||
|
|
||
| # 5. Extract the Status Byte (STB) | ||
| # The arguments start after the Creds and Verifier. | ||
| # We need to skip Creds(8 bytes) + Verifier(8 bytes) = 16 bytes. | ||
| # Standard RPC header is 24 bytes. Total offset = 40 bytes. | ||
| # The argument is the Status Byte (passed as an int). | ||
| try: | ||
| stb = struct.unpack(">I", data[40:44])[0] | ||
| except: | ||
| stb = 0 # Default if parsing fails | ||
|
|
||
| # 6. SEND THE REPLY (Mandatory!) | ||
| # We must tell the instrument we received the message. | ||
| self._send_srq_reply(conn, xid) | ||
|
|
||
| return stb | ||
|
|
||
| return None | ||
|
|
||
| def _send_srq_reply(self, conn, xid): | ||
| """Sends a successful RPC reply back to the instrument.""" | ||
|
|
||
| # Structure: [XID] [MsgType:Reply=1] [ReplyState:Accepted=0] | ||
| # [Verf:None] [AcceptStatus:Success=0] | ||
|
|
||
| reply_payload = ( | ||
| struct.pack(">I", xid) # Match the XID from the Request | ||
| + struct.pack(">I", 1) # MsgType: Reply | ||
| + struct.pack(">I", 0) # ReplyState: Accepted | ||
| + struct.pack(">I", 0) # Verifier Flavor (None) | ||
| + struct.pack(">I", 0) # Verifier Length (0) | ||
| + struct.pack(">I", 0) # AcceptStatus: Success | ||
| ) | ||
|
|
||
| # Add Fragment Header (Last Fragment bit set) | ||
| frag_header = 0x80000000 | len(reply_payload) | ||
| full_packet = struct.pack(">I", frag_header) + reply_payload | ||
|
|
||
| conn.sendall(full_packet) |
There was a problem hiding this comment.
Same using vxi11 module to build this would make more sense.
|
@MatthieuDartiailh thank you! I will try and make changes as commented but I may need to come back and ask for more guidance. |
black . && isort -c . && flake8with no errorsAs requested by @MatthieuDartiailh in #544 this PR attempts to implement the suggestions in the gist linked in that issue, as a basis for introducing TCPIP::INST event handlers. I do not expect to close #544 but link to it anyway as this is the issue of interest for this PR.
I'm not sure if there is a branch other than main I should be targeting with this PR.
I have not added an entry to CHANGES as I fully expect this to require a lot of edits before it works and passes testing.
I have not written unit tests but have included a test script based on the Gemini suggestion. I have not run the formatting tools as I'm not sure how to and pylance in VSCode showed no issues with typing nor formatting. I realise the scope of these tools is not likely a complete match with pylance.