By using a socketpair for all of the sockets managed by the VM class and its extensions, we don't need the sock_dir argument anymore, so remove it. We only added this argument so that we could specify a second, shorter temporary directory for cases where the temp/log dirs were "too long" as a socket name on macOS. We don't need it for this class now. In one case, avocado testing takes over responsibility for creating an appropriate sockdir. Signed-off-by: John Snow <jsnow@redhat.com> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> Message-id: 20230928044943.849073-7-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
		
			
				
	
	
		
			192 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			192 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
QEMU qtest library
 | 
						|
 | 
						|
qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
 | 
						|
offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
 | 
						|
subclass of QEMUMachine, respectively.
 | 
						|
"""
 | 
						|
 | 
						|
# Copyright (C) 2015 Red Hat Inc.
 | 
						|
#
 | 
						|
# Authors:
 | 
						|
#  Fam Zheng <famz@redhat.com>
 | 
						|
#
 | 
						|
# This work is licensed under the terms of the GNU GPL, version 2.  See
 | 
						|
# the COPYING file in the top-level directory.
 | 
						|
#
 | 
						|
# Based on qmp.py.
 | 
						|
#
 | 
						|
 | 
						|
import os
 | 
						|
import socket
 | 
						|
from typing import (
 | 
						|
    List,
 | 
						|
    Optional,
 | 
						|
    Sequence,
 | 
						|
    TextIO,
 | 
						|
    Tuple,
 | 
						|
)
 | 
						|
 | 
						|
from qemu.qmp import SocketAddrT
 | 
						|
 | 
						|
from .machine import QEMUMachine
 | 
						|
 | 
						|
 | 
						|
class QEMUQtestProtocol:
 | 
						|
    """
 | 
						|
    QEMUQtestProtocol implements a connection to a qtest socket.
 | 
						|
 | 
						|
    :param address: QEMU address, can be either a unix socket path (string)
 | 
						|
                    or a tuple in the form ( address, port ) for a TCP
 | 
						|
                    connection
 | 
						|
    :param sock: An existing socket can be provided as an alternative to
 | 
						|
                 an address. One of address or sock must be provided.
 | 
						|
    :param server: server mode, listens on the socket. Only meaningful
 | 
						|
                   in conjunction with an address and not an existing
 | 
						|
                   socket.
 | 
						|
 | 
						|
    :raise socket.error: on socket connection errors
 | 
						|
 | 
						|
    .. note::
 | 
						|
       No connection is established by __init__(), this is done
 | 
						|
       by the connect() or accept() methods.
 | 
						|
    """
 | 
						|
    def __init__(self,
 | 
						|
                 address: Optional[SocketAddrT] = None,
 | 
						|
                 sock: Optional[socket.socket] = None,
 | 
						|
                 server: bool = False):
 | 
						|
        if address is None and sock is None:
 | 
						|
            raise ValueError("Either 'address' or 'sock' must be specified")
 | 
						|
        if address is not None and sock is not None:
 | 
						|
            raise ValueError(
 | 
						|
                "Either 'address' or 'sock' must be specified, but not both")
 | 
						|
        if sock is not None and server:
 | 
						|
            raise ValueError("server=True is meaningless when passing socket")
 | 
						|
 | 
						|
        self._address = address
 | 
						|
        self._sock = sock or self._get_sock()
 | 
						|
        self._sockfile: Optional[TextIO] = None
 | 
						|
 | 
						|
        if server:
 | 
						|
            assert self._address is not None
 | 
						|
            self._sock.bind(self._address)
 | 
						|
            self._sock.listen(1)
 | 
						|
 | 
						|
    def _get_sock(self) -> socket.socket:
 | 
						|
        assert self._address is not None
 | 
						|
        if isinstance(self._address, tuple):
 | 
						|
            family = socket.AF_INET
 | 
						|
        else:
 | 
						|
            family = socket.AF_UNIX
 | 
						|
        return socket.socket(family, socket.SOCK_STREAM)
 | 
						|
 | 
						|
    def connect(self) -> None:
 | 
						|
        """
 | 
						|
        Connect to the qtest socket.
 | 
						|
 | 
						|
        @raise socket.error on socket connection errors
 | 
						|
        """
 | 
						|
        if self._address is not None:
 | 
						|
            self._sock.connect(self._address)
 | 
						|
        self._sockfile = self._sock.makefile(mode='r')
 | 
						|
 | 
						|
    def accept(self) -> None:
 | 
						|
        """
 | 
						|
        Await connection from QEMU.
 | 
						|
 | 
						|
        @raise socket.error on socket connection errors
 | 
						|
        """
 | 
						|
        self._sock, _ = self._sock.accept()
 | 
						|
        self._sockfile = self._sock.makefile(mode='r')
 | 
						|
 | 
						|
    def cmd(self, qtest_cmd: str) -> str:
 | 
						|
        """
 | 
						|
        Send a qtest command on the wire.
 | 
						|
 | 
						|
        @param qtest_cmd: qtest command text to be sent
 | 
						|
        """
 | 
						|
        assert self._sockfile is not None
 | 
						|
        self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
 | 
						|
        resp = self._sockfile.readline()
 | 
						|
        return resp
 | 
						|
 | 
						|
    def close(self) -> None:
 | 
						|
        """
 | 
						|
        Close this socket.
 | 
						|
        """
 | 
						|
        self._sock.close()
 | 
						|
        if self._sockfile:
 | 
						|
            self._sockfile.close()
 | 
						|
            self._sockfile = None
 | 
						|
 | 
						|
    def settimeout(self, timeout: Optional[float]) -> None:
 | 
						|
        """Set a timeout, in seconds."""
 | 
						|
        self._sock.settimeout(timeout)
 | 
						|
 | 
						|
 | 
						|
class QEMUQtestMachine(QEMUMachine):
 | 
						|
    """
 | 
						|
    A QEMU VM, with a qtest socket available.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self,
 | 
						|
                 binary: str,
 | 
						|
                 args: Sequence[str] = (),
 | 
						|
                 wrapper: Sequence[str] = (),
 | 
						|
                 name: Optional[str] = None,
 | 
						|
                 base_temp_dir: str = "/var/tmp",
 | 
						|
                 qmp_timer: Optional[float] = None):
 | 
						|
        # pylint: disable=too-many-arguments
 | 
						|
 | 
						|
        if name is None:
 | 
						|
            name = "qemu-%d" % os.getpid()
 | 
						|
        super().__init__(binary, args, wrapper=wrapper, name=name,
 | 
						|
                         base_temp_dir=base_temp_dir,
 | 
						|
                         qmp_timer=qmp_timer)
 | 
						|
        self._qtest: Optional[QEMUQtestProtocol] = None
 | 
						|
        self._qtest_sock_pair: Optional[
 | 
						|
            Tuple[socket.socket, socket.socket]] = None
 | 
						|
 | 
						|
    @property
 | 
						|
    def _base_args(self) -> List[str]:
 | 
						|
        args = super()._base_args
 | 
						|
        assert self._qtest_sock_pair is not None
 | 
						|
        fd = self._qtest_sock_pair[0].fileno()
 | 
						|
        args.extend([
 | 
						|
            '-chardev', f"socket,id=qtest,fd={fd}",
 | 
						|
            '-qtest', 'chardev:qtest',
 | 
						|
            '-accel', 'qtest'
 | 
						|
        ])
 | 
						|
        return args
 | 
						|
 | 
						|
    def _pre_launch(self) -> None:
 | 
						|
        self._qtest_sock_pair = socket.socketpair()
 | 
						|
        os.set_inheritable(self._qtest_sock_pair[0].fileno(), True)
 | 
						|
        super()._pre_launch()
 | 
						|
        self._qtest = QEMUQtestProtocol(sock=self._qtest_sock_pair[1])
 | 
						|
 | 
						|
    def _post_launch(self) -> None:
 | 
						|
        assert self._qtest is not None
 | 
						|
        super()._post_launch()
 | 
						|
        if self._qtest_sock_pair:
 | 
						|
            self._qtest_sock_pair[0].close()
 | 
						|
        self._qtest.connect()
 | 
						|
 | 
						|
    def _post_shutdown(self) -> None:
 | 
						|
        if self._qtest_sock_pair:
 | 
						|
            self._qtest_sock_pair[0].close()
 | 
						|
            self._qtest_sock_pair[1].close()
 | 
						|
            self._qtest_sock_pair = None
 | 
						|
        super()._post_shutdown()
 | 
						|
 | 
						|
    def qtest(self, cmd: str) -> str:
 | 
						|
        """
 | 
						|
        Send a qtest command to the guest.
 | 
						|
 | 
						|
        :param cmd: qtest command to send
 | 
						|
        :return: qtest server response
 | 
						|
        """
 | 
						|
        if self._qtest is None:
 | 
						|
            raise RuntimeError("qtest socket not available")
 | 
						|
        return self._qtest.cmd(cmd)
 |