python/aqmp: copy type definitions from qmp

Copy the remaining type definitions from QMP into the qemu.aqmp.legacy
module. Now, users that require the legacy interface don't need to
import anything else but qemu.aqmp.legacy wrapper.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: Beraldo Leal <bleal@redhat.com>
This commit is contained in:
John Snow
2022-01-10 18:28:47 -05:00
parent 3b5bf136f5
commit 0e6bfd8b96
2 changed files with 30 additions and 8 deletions

View File

@ -46,6 +46,10 @@ T = TypeVar('T')
_U = TypeVar('_U')
_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``
InternetAddrT = Tuple[str, int]
UnixAddrT = str
SocketAddrT = Union[UnixAddrT, InternetAddrT]
class Runstate(Enum):
"""Protocol session runstate."""
@ -257,7 +261,7 @@ class AsyncProtocol(Generic[T]):
@upper_half
@require(Runstate.IDLE)
async def accept(self, address: Union[str, Tuple[str, int]],
async def accept(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Accept a connection and begin processing message queues.
@ -275,7 +279,7 @@ class AsyncProtocol(Generic[T]):
@upper_half
@require(Runstate.IDLE)
async def connect(self, address: Union[str, Tuple[str, int]],
async def connect(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Connect to the server and begin processing message queues.
@ -337,7 +341,7 @@ class AsyncProtocol(Generic[T]):
@upper_half
async def _new_session(self,
address: Union[str, Tuple[str, int]],
address: SocketAddrT,
ssl: Optional[SSLContext] = None,
accept: bool = False) -> None:
"""
@ -397,7 +401,7 @@ class AsyncProtocol(Generic[T]):
@upper_half
async def _establish_connection(
self,
address: Union[str, Tuple[str, int]],
address: SocketAddrT,
ssl: Optional[SSLContext] = None,
accept: bool = False
) -> None:
@ -424,7 +428,7 @@ class AsyncProtocol(Generic[T]):
await self._do_connect(address, ssl)
@upper_half
async def _do_accept(self, address: Union[str, Tuple[str, int]],
async def _do_accept(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Acting as the transport server, accept a single connection.
@ -482,7 +486,7 @@ class AsyncProtocol(Generic[T]):
self.logger.debug("Connection accepted.")
@upper_half
async def _do_connect(self, address: Union[str, Tuple[str, int]],
async def _do_connect(self, address: SocketAddrT,
ssl: Optional[SSLContext] = None) -> None:
"""
Acting as the transport client, initiate a connection to a server.