diff --git a/.gitignore b/.gitignore index ef97309..6b8bf2d 100644 --- a/.gitignore +++ b/.gitignore @@ -110,4 +110,5 @@ ENV/ azure/mgmt/ azure/common/ azure/profiles/ +azure/servicebus/ features/steps/mgmt_settings_real.py diff --git a/HISTORY.rst b/HISTORY.rst index 89db3a3..d60b724 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,22 @@ Release History =============== +0.2.0 (2018-08-06) +++++++++++++++++++ + +- Stability improvements for EPH. +- Updated uAMQP version. +- Added new configuration options for Sender and Receiver; `keep_alive` and `auto_reconnect`. + These flags have been added to the following: + + - `EventHubClient.add_receiver` + - `EventHubClient.add_sender` + - `EventHubClientAsync.add_async_receiver` + - `EventHubClientAsync.add_async_sender` + - `EPHOptions.keey_alive_interval` + - `EPHOptions.auto_reconnect_on_error` + + 0.2.0rc2 (2018-07-29) +++++++++++++++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index 5acadea..ae780c2 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "0.2.0rc2" +__version__ = "0.2.0" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 93a60c5..c4bcadf 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -77,6 +77,7 @@ async def _start_client_async(self, client): try: await client.open_async() except Exception as exp: # pylint: disable=broad-except + log.info("Encountered error while starting handler: {}".format(exp)) await client.close_async(exception=exp) async def _handle_redirect(self, redirects): @@ -164,7 +165,9 @@ async def get_eventhub_info_async(self): finally: await mgmt_client.close_async() - def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None, loop=None): + def add_async_receiver( + self, consumer_group, partition, offset=None, prefetch=300, + operation=None, keep_alive=30, auto_reconnect=True, loop=None): """ Add an async receiver to the client for a particular consumer group and partition. @@ -184,11 +187,15 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30 path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop) + handler = AsyncReceiver( + self, source_url, offset=offset, prefetch=prefetch, + keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop) self.clients.append(handler) return handler - def add_async_epoch_receiver(self, consumer_group, partition, epoch, prefetch=300, operation=None, loop=None): + def add_async_epoch_receiver( + self, consumer_group, partition, epoch, prefetch=300, + operation=None, keep_alive=30, auto_reconnect=True, loop=None): """ Add an async receiver to the client with an epoch value. Only a single epoch receiver can connect to a partition at any given time - additional epoch receivers must have @@ -211,11 +218,13 @@ def add_async_epoch_receiver(self, consumer_group, partition, epoch, prefetch=30 path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - handler = AsyncReceiver(self, source_url, prefetch=prefetch, epoch=epoch, loop=loop) + handler = AsyncReceiver( + self, source_url, prefetch=prefetch, epoch=epoch, + keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop) self.clients.append(handler) return handler - def add_async_sender(self, partition=None, operation=None, loop=None): + def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True, loop=None): """ Add an async sender to the client to send ~azure.eventhub.common.EventData object to an EventHub. @@ -232,6 +241,8 @@ def add_async_sender(self, partition=None, operation=None, loop=None): target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: target = target + operation - handler = AsyncSender(self, target, partition=partition, loop=loop) + handler = AsyncSender( + self, target, partition=partition, keep_alive=keep_alive, + auto_reconnect=auto_reconnect, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index 6ba2c3a..4cf315c 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -4,6 +4,8 @@ # -------------------------------------------------------------------------------------------- import asyncio +import uuid +import logging from uamqp import errors, types from uamqp import ReceiveClientAsync, Source @@ -12,13 +14,17 @@ from azure.eventhub.receiver import Receiver from azure.eventhub.common import _error_handler +log = logging.getLogger(__name__) + class AsyncReceiver(Receiver): """ Implements the async API of a Receiver. """ - def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called + def __init__( # pylint: disable=super-init-not-called + self, client, source, offset=None, prefetch=300, epoch=None, + keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an async receiver. @@ -39,10 +45,14 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=N self.offset = offset self.prefetch = prefetch self.epoch = epoch + self.keep_alive = keep_alive + self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None self.properties = None + partition = self.source.split('/')[-1] + self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) source = Source(self.source) if self.offset is not None: source.set_filter(self.offset.selector()) @@ -56,7 +66,9 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=N link_properties=self.properties, timeout=self.timeout, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), loop=self.loop) async def open_async(self): @@ -85,7 +97,9 @@ async def open_async(self): link_properties=self.properties, timeout=self.timeout, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, + properties=self.client.create_properties(), loop=self.loop) await self._handler.open_async() while not await self.has_started(): @@ -110,7 +124,8 @@ async def reconnect_async(self): link_properties=self.properties, timeout=self.timeout, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties(), loop=self.loop) await self._handler.open_async() @@ -189,17 +204,27 @@ async def receive(self, max_batch_size=None, timeout=None): data_batch.append(event_data) return data_batch except (errors.LinkDetach, errors.ConnectionClose) as shutdown: - if shutdown.action.retry: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") await self.reconnect_async() return data_batch else: + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + await self.reconnect_async() + return data_batch + else: + log.info("AsyncReceiver detached. Shutting down.") error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except errors.MessageHandlerError: - await self.reconnect_async() - return data_batch except Exception as e: + log.info("Unexpected error occurred ({}). Shutting down.".format(e)) error = EventHubError("Receive failed: {}".format(e)) await self.close_async(exception=error) raise error diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index 42865f3..7dee78e 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +import uuid import asyncio from uamqp import constants, errors @@ -17,7 +18,7 @@ class AsyncSender(Sender): Implements the async API of a Sender. """ - def __init__(self, client, target, partition=None, loop=None): # pylint: disable=super-init-not-called + def __init__(self, client, target, partition=None, keep_alive=None, auto_reconnect=True, loop=None): # pylint: disable=super-init-not-called """ Instantiate an EventHub event SenderAsync handler. @@ -31,18 +32,23 @@ def __init__(self, client, target, partition=None, loop=None): # pylint: disabl self.client = client self.target = target self.partition = partition + self.keep_alive = keep_alive + self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.name = "EHSender-{}".format(uuid.uuid4()) self.redirected = None self.error = None if partition: self.target += "/Partitions/" + partition + self.name += "-partition{}".format(partition) self._handler = SendClientAsync( self.target, auth=self.client.get_auth(), debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties(), loop=self.loop) self._outcome = None @@ -65,7 +71,8 @@ async def open_async(self): debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties(), loop=self.loop) await self._handler.open_async() @@ -85,7 +92,8 @@ async def reconnect_async(self): debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties(), loop=self.loop) await self._handler.open_async() @@ -158,14 +166,19 @@ async def send(self, event_data): if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) except (errors.LinkDetach, errors.ConnectionClose) as shutdown: - if shutdown.action.retry: + if shutdown.action.retry and self.auto_reconnect: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: await self.reconnect_async() else: error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except errors.MessageHandlerError: - await self.reconnect_async() except Exception as e: error = EventHubError("Send failed: {}".format(e)) await self.close_async(exception=error) @@ -182,13 +195,18 @@ async def wait_async(self): try: await self._handler.wait_async() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: - if shutdown.action.retry: + if shutdown.action.retry and self.auto_reconnect: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: await self.reconnect_async() else: error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except errors.MessageHandlerError: - await self.reconnect_async() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 9c57cbd..08f07bd 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -303,7 +303,9 @@ def get_eventhub_info(self): finally: mgmt_client.close() - def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None): + def add_receiver( + self, consumer_group, partition, offset=None, prefetch=300, + operation=None, keep_alive=30, auto_reconnect=True): """ Add a receiver to the client for a particular consumer group and partition. @@ -323,11 +325,15 @@ def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, ope path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - handler = Receiver(self, source_url, offset=offset, prefetch=prefetch) + handler = Receiver( + self, source_url, offset=offset, prefetch=prefetch, + keep_alive=keep_alive, auto_reconnect=auto_reconnect) self.clients.append(handler) return handler - def add_epoch_receiver(self, consumer_group, partition, epoch, prefetch=300, operation=None): + def add_epoch_receiver( + self, consumer_group, partition, epoch, prefetch=300, + operation=None, keep_alive=30, auto_reconnect=True): """ Add a receiver to the client with an epoch value. Only a single epoch receiver can connect to a partition at any given time - additional epoch receivers must have @@ -350,11 +356,13 @@ def add_epoch_receiver(self, consumer_group, partition, epoch, prefetch=300, ope path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - handler = Receiver(self, source_url, prefetch=prefetch, epoch=epoch) + handler = Receiver( + self, source_url, prefetch=prefetch, epoch=epoch, + keep_alive=keep_alive, auto_reconnect=auto_reconnect) self.clients.append(handler) return handler - def add_sender(self, partition=None, operation=None): + def add_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True): """ Add a sender to the client to send ~azure.eventhub.common.EventData object to an EventHub. @@ -371,6 +379,6 @@ def add_sender(self, partition=None, operation=None): target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: target = target + operation - handler = Sender(self, target, partition=partition) + handler = Sender(self, target, partition=partition, keep_alive=keep_alive, auto_reconnect=auto_reconnect) self.clients.append(handler) return handler diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 49a15ce..90af41e 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -3,6 +3,8 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +import uuid + from uamqp import types, errors from uamqp import ReceiveClient, Source @@ -16,7 +18,7 @@ class Receiver: timeout = 0 _epoch = b'com.microsoft:epoch' - def __init__(self, client, source, offset=None, prefetch=300, epoch=None): + def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_alive=None, auto_reconnect=True): """ Instantiate a receiver. @@ -35,10 +37,14 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None): self.offset = offset self.prefetch = prefetch self.epoch = epoch + self.keep_alive = keep_alive + self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.properties = None self.redirected = None self.error = None + partition = self.source.split('/')[-1] + self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition) source = Source(self.source) if self.offset is not None: source.set_filter(self.offset.selector()) @@ -52,7 +58,8 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None): link_properties=self.properties, timeout=self.timeout, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties()) def open(self): @@ -81,7 +88,8 @@ def open(self): link_properties=self.properties, timeout=self.timeout, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties()) self._handler.open() while not self.has_started(): @@ -106,7 +114,8 @@ def reconnect(self): link_properties=self.properties, timeout=self.timeout, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties()) self._handler.open() while not self.has_started(): @@ -204,16 +213,21 @@ def receive(self, max_batch_size=None, timeout=None): data_batch.append(event_data) return data_batch except (errors.LinkDetach, errors.ConnectionClose) as shutdown: - if shutdown.action.retry: + if shutdown.action.retry and self.auto_reconnect: + self.reconnect() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: self.reconnect() return data_batch else: error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.MessageHandlerError: - self.reconnect() - return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index ff358d0..b59ed70 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -3,6 +3,8 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +import uuid + from uamqp import constants, errors from uamqp import SendClient @@ -15,7 +17,7 @@ class Sender: """ TIMEOUT = 60.0 - def __init__(self, client, target, partition=None): + def __init__(self, client, target, partition=None, keep_alive=None, auto_reconnect=True): """ Instantiate an EventHub event Sender handler. @@ -29,16 +31,21 @@ def __init__(self, client, target, partition=None): self.partition = partition self.redirected = None self.error = None + self.keep_alive = keep_alive + self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.name = "EHSender-{}".format(uuid.uuid4()) if partition: self.target += "/Partitions/" + partition + self.name += "-partition{}".format(partition) self._handler = SendClient( self.target, auth=self.client.get_auth(), debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties()) self._outcome = None self._condition = None @@ -60,7 +67,8 @@ def open(self): debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties()) self._handler.open() while not self.has_started(): @@ -79,7 +87,8 @@ def reconnect(self): debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, - keep_alive_interval=30, + keep_alive_interval=self.keep_alive, + client_name=self.name, properties=self.client.create_properties()) self._handler.open() self._handler._pending_messages = unsent_events @@ -165,14 +174,19 @@ def send(self, event_data): self.close(exception=error) raise error except (errors.LinkDetach, errors.ConnectionClose) as shutdown: - if shutdown.action.retry: + if shutdown.action.retry and self.auto_reconnect: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: self.reconnect() else: error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.MessageHandlerError: - self.reconnect() except Exception as e: error = EventHubError("Send failed: {}".format(e)) self.close(exception=error) @@ -207,14 +221,19 @@ def wait(self): try: self._handler.wait() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: - if shutdown.action.retry: + if shutdown.action.retry and self.auto_reconnect: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: self.reconnect() else: error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.MessageHandlerError: - self.reconnect() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py index a631fdb..001eafa 100644 --- a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py +++ b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py @@ -131,7 +131,7 @@ async def update_checkpoint_async(self, lease, checkpoint): new_lease.with_source(lease) new_lease.offset = checkpoint.offset new_lease.sequence_number = checkpoint.sequence_number - await self.update_lease_async(new_lease) + return await self.update_lease_async(new_lease) async def delete_checkpoint_async(self, partition_id): """ @@ -331,7 +331,7 @@ async def acquire_lease_async(self, lease): lease.owner = self.host.host_name lease.increment_epoch() # check if this solves the issue - await self.update_lease_async(lease) + retval = await self.update_lease_async(lease) except Exception as err: # pylint: disable=broad-except _logger.error("Failed to acquire lease {!r} {} {}".format( err, partition_id, lease.token)) @@ -361,7 +361,7 @@ async def renew_lease_async(self, lease): timeout=self.lease_duration)) except Exception as err: # pylint: disable=broad-except if "LeaseIdMismatchWithLeaseOperation" in str(err): - _logger.info("LeaseLost") + _logger.info("LeaseLost on partition {}".format(lease.partition_id)) else: _logger.error("Failed to renew lease on partition {} with token {} {!r}".format( lease.partition_id, lease.token, err)) diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 4801a25..368c2bb 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -73,6 +73,8 @@ async def open_clients_async(self): self.partition_context.partition_id, Offset(self.partition_context.offset), prefetch=self.host.eph_options.prefetch_count, + keep_alive=self.host.eph_options.keep_alive_interval, + auto_reconnect=self.host.eph_options.auto_reconnect_on_error, loop=self.loop) self.partition_receiver = PartitionReceiver(self) @@ -95,7 +97,12 @@ async def on_closing_async(self, reason): :type reason: str """ self.partition_receiver.eh_partition_pump.set_pump_status("Errored") - await self.running + try: + await self.running + except TypeError: + _logger.debug("No partition pump running.") + except Exception as err: # pylint: disable=broad-except + _logger.info("Error on closing partition pump: {!r}".format(err)) await self.clean_up_clients_async() @@ -121,6 +128,7 @@ async def run(self): max_batch_size=self.max_batch_size, timeout=self.recieve_timeout) except Exception as e: # pylint: disable=broad-except + _logger.info("Error raised while attempting to receive messages: {}".format(e)) await self.process_error_async(e) else: if not msgs: diff --git a/azure/eventprocessorhost/eph.py b/azure/eventprocessorhost/eph.py index 7c7541e..2e464e1 100644 --- a/azure/eventprocessorhost/eph.py +++ b/azure/eventprocessorhost/eph.py @@ -64,6 +64,38 @@ async def close_async(self): class EPHOptions: """ Class that contains default and overidable EPH option. + + :ivar max_batch_size: The maximum number of events retrieved for processing + at a time. This value must be less than or equal to the prefetch count. The actual + number of events returned for processing may be any number up to the maximum. + The default value is 10. + :vartype max_batch_size: int + :ivar prefetch_count: The number of events to fetch from the service in advance of + processing. The default value is 300. + :vartype prefetch_count: int + :ivar receive_timeout: The length of time a single partition receiver will wait in + order to receive a batch of events. Default is 60 seconds. + :vartype receive_timeout: int + :ivar release_pump_on_timeout: Whether to shutdown an individual partition receiver if + no events were received in the specified timeout. Shutting down the pump will release + the lease to allow it to be picked up by another host. Default is False. + :vartype release_pump_on_timeout: bool + :ivar initial_offset_provider: The initial event offset to receive from if no persisted + offset is found. Default is "-1" (i.e. from the first event available). + :vartype initial_offset_provider: str + :ivar debug_trace: Whether to emit the network traffic in the logs. In order to view + these events the logger must be configured to track "uamqp". Default is False. + :vartype debug_trace: bool + :ivar http_proxy: HTTP proxy configuration. This should be a dictionary with + the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional + keys are 'username' and 'password'. + :vartype http_proxy: dict + :ivar keep_alive_interval: The time in seconds between asynchronously pinging a receiver + connection to keep it alive during inactivity. Default is None - i.e. no connection pinging. + :vartype keep_alive_interval: int + :ivar auto_reconnect_on_error: Whether to automatically attempt to reconnect a receiver + connection if it is detach from the service with a retryable error. Default is True. + :vartype auto_reconnect_on_error: bool """ def __init__(self): @@ -74,3 +106,5 @@ def __init__(self): self.initial_offset_provider = "-1" self.debug_trace = False self.http_proxy = None + self.keep_alive_interval = None + self.auto_reconnect_on_error = True diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index b21514b..33cc566 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -115,7 +115,9 @@ async def persist_checkpoint_async(self, checkpoint): _logger.info("persisting checkpoint {}".format(checkpoint.__dict__)) await self.host.storage_manager.create_checkpoint_if_not_exists_async(checkpoint.partition_id) - await self.host.storage_manager.update_checkpoint_async(self.lease, checkpoint) + if not await self.host.storage_manager.update_checkpoint_async(self.lease, checkpoint): + _logger.error("Failed to persist checkpoint for partition: {}".format(self.partition_id)) + raise Exception("failed to persist checkpoint") self.lease.offset = checkpoint.offset self.lease.sequence_number = checkpoint.sequence_number else: diff --git a/azure/eventprocessorhost/partition_manager.py b/azure/eventprocessorhost/partition_manager.py index 7025fe0..c9c927c 100644 --- a/azure/eventprocessorhost/partition_manager.py +++ b/azure/eventprocessorhost/partition_manager.py @@ -190,6 +190,7 @@ async def run_loop_async(self): self.host.guid, partition_id)) await self.check_and_add_pump_async(partition_id, updated_lease) else: + _logger.debug("Removing pump due to lost lease.") await self.remove_pump_async(partition_id, "LeaseLost") except Exception as err: # pylint: disable=broad-except _logger.error("Failed to update lease {!r}".format(err)) diff --git a/setup.py b/setup.py index df46435..8efb8aa 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ zip_safe=False, packages=find_packages(exclude=["examples", "tests"]), install_requires=[ - 'uamqp~=0.2.0', + 'uamqp>=0.2.1,<0.3.0', 'msrestazure~=0.4.11', 'azure-common~=1.1', 'azure-storage~=0.36.0' diff --git a/tests/test_iothub_receive_async.py b/tests/test_iothub_receive_async.py index d26c00c..66bea90 100644 --- a/tests/test_iothub_receive_async.py +++ b/tests/test_iothub_receive_async.py @@ -18,9 +18,7 @@ async def pump(receiver, sleep=None): if sleep: await asyncio.sleep(sleep) batch = await receiver.receive(timeout=1) - while batch: - messages += len(batch) - batch = await receiver.receive(timeout=1) + messages += len(batch) return messages @@ -42,11 +40,11 @@ async def test_iothub_receive_multiple_async(iot_connection_str): try: receivers = [] for p in partitions: - receivers.append(client.add_async_receiver("$default", p, prefetch=1000, operation='/messages/events')) + receivers.append(client.add_async_receiver("$default", p, prefetch=10, operation='/messages/events')) await client.run_async() outputs = await asyncio.gather(*[pump(r) for r in receivers]) - assert isinstance(outputs[0], int) and outputs[0] == 0 - assert isinstance(outputs[1], int) and outputs[1] == 0 + assert isinstance(outputs[0], int) and outputs[0] <= 10 + assert isinstance(outputs[1], int) and outputs[1] <= 10 finally: await client.stop_async()