[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v4 08/27] python/aqmp: add logging to AsyncProtocol
From: |
John Snow |
Subject: |
[PATCH v4 08/27] python/aqmp: add logging to AsyncProtocol |
Date: |
Wed, 15 Sep 2021 12:29:36 -0400 |
Give the connection and the reader/writer tasks nicknames, and add
logging statements throughout.
Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
---
python/qemu/aqmp/protocol.py | 82 ++++++++++++++++++++++++++++++++----
1 file changed, 73 insertions(+), 9 deletions(-)
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 19460857bd..1dfd12895d 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -14,6 +14,7 @@
from asyncio import StreamReader, StreamWriter
from enum import Enum
from functools import wraps
+import logging
from ssl import SSLContext
from typing import (
Any,
@@ -32,8 +33,10 @@
from .util import (
bottom_half,
create_task,
+ exception_summary,
flush,
is_closing,
+ pretty_traceback,
upper_half,
wait_closed,
)
@@ -174,14 +177,28 @@ class AsyncProtocol(Generic[T]):
can be written after the super() call.
- `_on_message`:
Actions to be performed when a message is received.
+
+ :param name:
+ Name used for logging messages, if any. By default, messages
+ will log to 'qemu.aqmp.protocol', but each individual connection
+ can be given its own logger by giving it a name; messages will
+ then log to 'qemu.aqmp.protocol.${name}'.
"""
# pylint: disable=too-many-instance-attributes
+ #: Logger object for debugging messages from this connection.
+ logger = logging.getLogger(__name__)
+
# -------------------------
# Section: Public interface
# -------------------------
- def __init__(self) -> None:
+ def __init__(self, name: Optional[str] = None) -> None:
+ #: The nickname for this connection, if any.
+ self.name: Optional[str] = name
+ if self.name is not None:
+ self.logger = self.logger.getChild(self.name)
+
# stream I/O
self._reader: Optional[StreamReader] = None
self._writer: Optional[StreamWriter] = None
@@ -205,6 +222,14 @@ def __init__(self) -> None:
self._runstate = Runstate.IDLE
self._runstate_changed: Optional[asyncio.Event] = None
+ def __repr__(self) -> str:
+ cls_name = type(self).__name__
+ tokens = []
+ if self.name is not None:
+ tokens.append(f"name={self.name!r}")
+ tokens.append(f"runstate={self.runstate.name}")
+ return f"<{cls_name} {' '.join(tokens)}>"
+
@property # @upper_half
def runstate(self) -> Runstate:
"""The current `Runstate` of the connection."""
@@ -246,6 +271,7 @@ async def disconnect(self) -> None:
:raise Exception: When the reader or writer terminate unexpectedly.
"""
+ self.logger.debug("disconnect() called.")
self._schedule_disconnect()
await self._wait_disconnect()
@@ -273,6 +299,8 @@ def _set_state(self, state: Runstate) -> None:
if state == self._runstate:
return
+ self.logger.debug("Transitioning from '%s' to '%s'.",
+ str(self._runstate), str(state))
self._runstate = state
self._runstate_event.set()
self._runstate_event.clear()
@@ -312,8 +340,15 @@ async def _new_session(self,
except BaseException as err:
emsg = f"Failed to establish {phase}"
- # Reset from CONNECTING back to IDLE.
- await self.disconnect()
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ try:
+ # Reset from CONNECTING back to IDLE.
+ await self.disconnect()
+ except:
+ emsg = "Unexpected bottom half exception"
+ self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
+ raise
# NB: CancelledError is not a BaseException before Python 3.8
if isinstance(err, asyncio.CancelledError):
@@ -363,12 +398,16 @@ async def _do_connect(self, address: Union[str,
Tuple[str, int]],
:raise OSError: For stream-related errors.
"""
+ self.logger.debug("Connecting to %s ...", address)
+
if isinstance(address, tuple):
connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
else:
connect = asyncio.open_unix_connection(path=address, ssl=ssl)
self._reader, self._writer = await connect
+ self.logger.debug("Connected.")
+
@upper_half
async def _establish_session(self) -> None:
"""
@@ -382,8 +421,8 @@ async def _establish_session(self) -> None:
self._outgoing = asyncio.Queue()
- reader_coro = self._bh_loop_forever(self._bh_recv_message)
- writer_coro = self._bh_loop_forever(self._bh_send_message)
+ reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+ writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
self._reader_task = create_task(reader_coro)
self._writer_task = create_task(writer_coro)
@@ -410,6 +449,7 @@ def _schedule_disconnect(self) -> None:
"""
if not self._dc_task:
self._set_state(Runstate.DISCONNECTING)
+ self.logger.debug("Scheduling disconnect.")
self._dc_task = create_task(self._bh_disconnect())
@upper_half
@@ -492,30 +532,39 @@ def _done(task: Optional['asyncio.Future[Any]']) -> bool:
# Try to flush the writer, if possible:
if not error_pathway:
await self._bh_flush_writer()
- except:
+ except BaseException as err:
error_pathway = True
+ emsg = "Failed to flush the writer"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise
finally:
# Cancel any still-running tasks:
if self._writer_task is not None and not self._writer_task.done():
+ self.logger.debug("Cancelling writer task.")
self._writer_task.cancel()
if self._reader_task is not None and not self._reader_task.done():
+ self.logger.debug("Cancelling reader task.")
self._reader_task.cancel()
# Close out the tasks entirely (Won't raise):
if tasks:
+ self.logger.debug("Waiting for tasks to complete ...")
await asyncio.wait(tasks)
# Lastly, close the stream itself. (May raise):
await self._bh_close_stream(error_pathway)
+ self.logger.debug("Disconnected.")
@bottom_half
async def _bh_flush_writer(self) -> None:
if not self._writer_task:
return
+ self.logger.debug("Draining the outbound queue ...")
await self._outgoing.join()
if self._writer is not None:
+ self.logger.debug("Flushing the StreamWriter ...")
await flush(self._writer)
@bottom_half
@@ -525,8 +574,10 @@ async def _bh_close_stream(self, error_pathway: bool =
False) -> None:
return
if not is_closing(self._writer):
+ self.logger.debug("Closing StreamWriter.")
self._writer.close()
+ self.logger.debug("Waiting for StreamWriter to close ...")
try:
await wait_closed(self._writer)
except Exception: # pylint: disable=broad-except
@@ -541,13 +592,18 @@ async def _bh_close_stream(self, error_pathway: bool =
False) -> None:
# just trust that the Exception we already have is the
# better one to present to the user, even if we don't
# genuinely *know* the relationship between the two.
- pass
+ self.logger.debug(
+ "Discarding Exception from wait_closed:\n%s\n",
+ pretty_traceback(),
+ )
else:
# Oops, this is a brand-new error!
raise
+ finally:
+ self.logger.debug("StreamWriter closed.")
@bottom_half
- async def _bh_loop_forever(self, async_fn: _TaskFN) -> None:
+ async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
"""
Run one of the bottom-half methods in a loop forever.
@@ -555,16 +611,24 @@ async def _bh_loop_forever(self, async_fn: _TaskFN) ->
None:
disconnect that will terminate the entire loop.
:param async_fn: The bottom-half method to run in a loop.
+ :param name: The name of this task, used for logging.
"""
try:
while True:
await async_fn()
except asyncio.CancelledError:
# We have been cancelled by _bh_disconnect, exit gracefully.
+ self.logger.debug("Task.%s: cancelled.", name)
return
- except BaseException:
+ except BaseException as err:
+ self.logger.error("Task.%s: %s",
+ name, exception_summary(err))
+ self.logger.debug("Task.%s: failure:\n%s\n",
+ name, pretty_traceback())
self._schedule_disconnect()
raise
+ finally:
+ self.logger.debug("Task.%s: exiting.", name)
@bottom_half
async def _bh_send_message(self) -> None:
--
2.31.1
- [PATCH v4 00/27] python: introduce Asynchronous QMP package, John Snow, 2021/09/15
- [PATCH v4 01/27] python/aqmp: add asynchronous QMP (AQMP) subpackage, John Snow, 2021/09/15
- [PATCH v4 02/27] python/aqmp: add error classes, John Snow, 2021/09/15
- [PATCH v4 03/27] python/pylint: Add exception for TypeVar names ('T'), John Snow, 2021/09/15
- [PATCH v4 04/27] python/aqmp: add asyncio compatibility wrappers, John Snow, 2021/09/15
- [PATCH v4 06/27] python/aqmp: add runstate state machine to AsyncProtocol, John Snow, 2021/09/15
- [PATCH v4 05/27] python/aqmp: add generic async message-based protocol support, John Snow, 2021/09/15
- [PATCH v4 07/27] python/aqmp: Add logging utility helpers, John Snow, 2021/09/15
- [PATCH v4 08/27] python/aqmp: add logging to AsyncProtocol,
John Snow <=
- [PATCH v4 09/27] python/aqmp: add AsyncProtocol.accept() method, John Snow, 2021/09/15
- [PATCH v4 10/27] python/aqmp: add configurable read buffer limit, John Snow, 2021/09/15
- [PATCH v4 11/27] python/aqmp: add _cb_inbound and _cb_outbound logging hooks, John Snow, 2021/09/15
- [PATCH v4 12/27] python/aqmp: add AsyncProtocol._readline() method, John Snow, 2021/09/15
- [PATCH v4 13/27] python/aqmp: add QMP Message format, John Snow, 2021/09/15
- [PATCH v4 14/27] python/aqmp: add well-known QMP object models, John Snow, 2021/09/15
- [PATCH v4 16/27] python/pylint: disable too-many-function-args, John Snow, 2021/09/15
- [PATCH v4 17/27] python/aqmp: add QMP protocol support, John Snow, 2021/09/15
- [PATCH v4 19/27] python/aqmp: Add message routing to QMP protocol, John Snow, 2021/09/15
- [PATCH v4 20/27] python/aqmp: add execute() interfaces, John Snow, 2021/09/15