Class MessageReceiver
- Namespace
- Microsoft.Azure.ServiceBus.Core
- Assembly
- Microsoft.Azure.ServiceBus.dll
The MessageReceiver can be used to receive messages from Queues and Subscriptions and acknowledge them.
public class MessageReceiver : ClientEntity, IMessageReceiver, IReceiverClient, IClientEntity
- Inheritance
-
MessageReceiver
- Implements
- Inherited Members
Examples
Create a new MessageReceiver to receive a message from a Subscription
IMessageReceiver messageReceiver = new MessageReceiver(
namespaceConnectionString,
EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName),
ReceiveMode.PeekLock);
Receive a message from the Subscription.
var message = await messageReceiver.ReceiveAsync();
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
Remarks
The MessageReceiver provides advanced functionality that is not found in the QueueClient or SubscriptionClient. For instance, ReceiveAsync(), which allows you to receive messages on demand, but also requires you to manually renew locks using RenewLockAsync(Message). It uses AMQP protocol to communicate with service.
Constructors
MessageReceiver(ServiceBusConnection, string, ReceiveMode, RetryPolicy, int)
Creates a new AMQP MessageReceiver on a given ServiceBusConnection
public MessageReceiver(ServiceBusConnection serviceBusConnection, string entityPath, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)
Parameters
serviceBusConnection
ServiceBusConnectionConnection object to the service bus namespace.
entityPath
stringThe path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path. You can use FormatSubscriptionPath(string, string), to help create this path.
receiveMode
ReceiveModeThe ReceiveMode used to specify how messages are received. Defaults to PeekLock mode.
retryPolicy
RetryPolicyThe RetryPolicy that will be used when communicating with Service Bus. Defaults to Default
prefetchCount
intThe PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.
MessageReceiver(ServiceBusConnectionStringBuilder, ReceiveMode, RetryPolicy, int)
Creates a new MessageReceiver from a ServiceBusConnectionStringBuilder.
public MessageReceiver(ServiceBusConnectionStringBuilder connectionStringBuilder, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)
Parameters
connectionStringBuilder
ServiceBusConnectionStringBuilderThe ServiceBusConnectionStringBuilder having entity level connection details.
receiveMode
ReceiveModeThe ReceiveMode used to specify how messages are received. Defaults to PeekLock mode.
retryPolicy
RetryPolicyThe RetryPolicy that will be used when communicating with Service Bus. Defaults to Default.
prefetchCount
intThe PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.
Remarks
Creates a new connection to the entity, which is opened during the first operation.
MessageReceiver(string, string, ITokenProvider, TransportType, ReceiveMode, RetryPolicy, int)
Creates a new MessageReceiver from a specified endpoint, entity path, and token provider.
public MessageReceiver(string endpoint, string entityPath, ITokenProvider tokenProvider, TransportType transportType = TransportType.Amqp, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)
Parameters
endpoint
stringFully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.windows.net
entityPath
stringQueue path.
tokenProvider
ITokenProviderToken provider which will generate security tokens for authorization.
transportType
TransportTypeTransport type.
receiveMode
ReceiveModeMode of receive of messages. Defaults to ReceiveMode.PeekLock.
retryPolicy
RetryPolicyRetry policy for queue operations. Defaults to Default
prefetchCount
intThe PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.
Remarks
Creates a new connection to the entity, which is opened during the first operation.
MessageReceiver(string, string, ReceiveMode, RetryPolicy, int)
Creates a new MessageReceiver from a specified connection string and entity path.
public MessageReceiver(string connectionString, string entityPath, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)
Parameters
connectionString
stringNamespace connection string used to communicate with Service Bus. Must not contain Entity details.
entityPath
stringThe path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path. You can use FormatSubscriptionPath(string, string), to help create this path.
receiveMode
ReceiveModeThe ReceiveMode used to specify how messages are received. Defaults to PeekLock mode.
retryPolicy
RetryPolicyThe RetryPolicy that will be used when communicating with Service Bus. Defaults to Default
prefetchCount
intThe PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.
Remarks
Creates a new connection to the entity, which is opened during the first operation.
Properties
LastPeekedSequenceNumber
Gets the sequence number of the last peeked message.
public long LastPeekedSequenceNumber { get; }
Property Value
- See Also
OperationTimeout
Duration after which individual operations will timeout.
public override TimeSpan OperationTimeout { get; set; }
Property Value
Path
The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path.
public override string Path { get; }
Property Value
PrefetchCount
Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when and before the application asks for one using Receive. Setting a non-zero value prefetches PrefetchCount number of messages. Setting the value to zero turns prefetch off. Defaults to 0.
public int PrefetchCount { get; set; }
Property Value
Remarks
When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, than what the application immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immediate consumption that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.
While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the buffer and then wait or block as expected.
Prefetch also works equivalently with the RegisterMessageHandler(Func<Message, CancellationToken, Task>, Func<ExceptionReceivedEventArgs, Task>) APIs.
Updates to this value take effect on the next receive call to the service.
ReceiveMode
Gets the ReceiveMode of the current receiver.
public ReceiveMode ReceiveMode { get; protected set; }
Property Value
RegisteredPlugins
Gets a list of currently registered plugins.
public override IList<ServiceBusPlugin> RegisteredPlugins { get; }
Property Value
ServiceBusConnection
Connection object to the service bus namespace.
public override ServiceBusConnection ServiceBusConnection { get; }
Property Value
Methods
AbandonAsync(string, IDictionary<string, object>)
Abandons a Message using a lock token. This will make the message available again for processing.
public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
Parameters
lockToken
stringThe lock token of the corresponding message to abandon.
propertiesToModify
IDictionary<string, object>The properties of the message to modify while abandoning the message.
Returns
Remarks
A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. Abandoning a message will increase the delivery count on the message. This operation can only be performed on messages that were received by this receiver.
CompleteAsync(IEnumerable<string>)
Completes a series of Message using a list of lock tokens. This will delete the message from the service.
public Task CompleteAsync(IEnumerable<string> lockTokens)
Parameters
lockTokens
IEnumerable<string>An IEnumerable<T> containing the lock tokens of the corresponding messages to complete.
Returns
Remarks
A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. This operation can only be performed on messages that were received by this receiver.
CompleteAsync(string)
Completes a Message using its lock token. This will delete the message from the service.
public Task CompleteAsync(string lockToken)
Parameters
lockToken
stringThe lock token of the corresponding message to complete.
Returns
Remarks
A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. This operation can only be performed on messages that were received by this receiver.
DeadLetterAsync(string, IDictionary<string, object>)
Moves a message to the deadletter sub-queue.
public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
Parameters
lockToken
stringThe lock token of the corresponding message to deadletter.
propertiesToModify
IDictionary<string, object>The properties of the message to modify while moving to sub-queue.
Returns
Remarks
A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. In order to receive a message from the deadletter queue, you will need a new IMessageReceiver, with the corresponding path. You can use FormatDeadLetterPath(string) to help with this. This operation can only be performed on messages that were received by this receiver.
DeadLetterAsync(string, string, string)
Moves a message to the deadletter sub-queue.
public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)
Parameters
lockToken
stringThe lock token of the corresponding message to deadletter.
deadLetterReason
stringThe reason for deadlettering the message.
deadLetterErrorDescription
stringThe error description for deadlettering the message.
Returns
Remarks
A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. In order to receive a message from the deadletter queue, you will need a new IMessageReceiver, with the corresponding path. You can use FormatDeadLetterPath(string) to help with this. This operation can only be performed on messages that were received by this receiver.
DeferAsync(string, IDictionary<string, object>)
Indicates that the receiver wants to defer the processing for the message.
public Task DeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
Parameters
lockToken
stringThe lock token of the Message.
propertiesToModify
IDictionary<string, object>The properties of the message to modify while deferring the message.
Returns
Remarks
A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. In order to receive this message again in the future, you will need to save the SequenceNumber and receive it using ReceiveDeferredMessageAsync(long). Deferring messages does not impact message's expiration, meaning that deferred messages can still expire. This operation can only be performed on messages that were received by this receiver.
OnAbandonAsync(string, IDictionary<string, object>)
protected virtual Task OnAbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
Parameters
lockToken
stringpropertiesToModify
IDictionary<string, object>
Returns
OnClosingAsync()
protected override Task OnClosingAsync()
Returns
OnCompleteAsync(IEnumerable<string>)
protected virtual Task OnCompleteAsync(IEnumerable<string> lockTokens)
Parameters
lockTokens
IEnumerable<string>
Returns
OnDeadLetterAsync(string, IDictionary<string, object>, string, string)
protected virtual Task OnDeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null, string deadLetterReason = null, string deadLetterErrorDescription = null)
Parameters
lockToken
stringpropertiesToModify
IDictionary<string, object>deadLetterReason
stringdeadLetterErrorDescription
string
Returns
OnDeferAsync(string, IDictionary<string, object>)
protected virtual Task OnDeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)
Parameters
lockToken
stringpropertiesToModify
IDictionary<string, object>
Returns
OnMessageHandler(MessageHandlerOptions, Func<Message, CancellationToken, Task>)
protected virtual void OnMessageHandler(MessageHandlerOptions registerHandlerOptions, Func<Message, CancellationToken, Task> callback)
Parameters
registerHandlerOptions
MessageHandlerOptionscallback
Func<Message, CancellationToken, Task>
OnPeekAsync(long, int)
protected virtual Task<IList<Message>> OnPeekAsync(long fromSequenceNumber, int messageCount = 1)
Parameters
Returns
OnReceiveAsync(int, TimeSpan)
protected virtual Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
Parameters
Returns
OnReceiveDeferredMessageAsync(long[])
protected virtual Task<IList<Message>> OnReceiveDeferredMessageAsync(long[] sequenceNumbers)
Parameters
sequenceNumbers
long[]
Returns
OnRenewLockAsync(string)
protected virtual Task<DateTime> OnRenewLockAsync(string lockToken)
Parameters
lockToken
string
Returns
PeekAsync()
Fetches the next active message without changing the state of the receiver or the message source.
public Task<Message> PeekAsync()
Returns
- Task<Message>
The Message that represents the next message to be read. Returns null when nothing to peek.
Remarks
The first call to PeekAsync() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity. Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Deferred/Deadlettered/Renewed. Also, unlike ReceiveAsync(), this method will fetch even Deferred messages (but not Deadlettered message)
PeekAsync(int)
Fetches the next batch of active messages without changing the state of the receiver or the message source.
public Task<IList<Message>> PeekAsync(int maxMessageCount)
Parameters
maxMessageCount
int
Returns
- Task<IList<Message>>
List of Message that represents the next message to be read. Returns null when nothing to peek.
Remarks
The first call to PeekAsync() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity. Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Deferred/Deadlettered/Renewed. Also, unlike ReceiveAsync(), this method will fetch even Deferred messages (but not Deadlettered message)
PeekBySequenceNumberAsync(long)
Asynchronously reads the next message without changing the state of the receiver or the message source.
public Task<Message> PeekBySequenceNumberAsync(long fromSequenceNumber)
Parameters
fromSequenceNumber
longThe sequence number from where to read the message.
Returns
- Task<Message>
The asynchronous operation that returns the Message that represents the next message to be read.
PeekBySequenceNumberAsync(long, int)
Peeks a batch of messages.
public Task<IList<Message>> PeekBySequenceNumberAsync(long fromSequenceNumber, int messageCount)
Parameters
fromSequenceNumber
longThe starting point from which to browse a batch of messages.
messageCount
intThe number of messages to retrieve.
Returns
ReceiveAsync()
Receive a message from the entity defined by Path using ReceiveMode mode.
public Task<Message> ReceiveAsync()
Returns
Remarks
Operation will time out after duration of OperationTimeout
ReceiveAsync(int)
Receives a maximum of maxMessageCount
messages from the entity defined by Path using ReceiveMode mode.
public Task<IList<Message>> ReceiveAsync(int maxMessageCount)
Parameters
maxMessageCount
intThe maximum number of messages that will be received.
Returns
Remarks
Receiving less than maxMessageCount
messages is not an indication of empty entity.
ReceiveAsync(int, TimeSpan)
Receives a maximum of maxMessageCount
messages from the entity defined by Path using ReceiveMode mode.
public Task<IList<Message>> ReceiveAsync(int maxMessageCount, TimeSpan operationTimeout)
Parameters
maxMessageCount
intThe maximum number of messages that will be received.
operationTimeout
TimeSpanThe time span the client waits for receiving a message before it times out.
Returns
Remarks
Receiving less than maxMessageCount
messages is not an indication of empty entity.
The parameter operationTimeout
includes the time taken by the receiver to establish a connection
(either during the first receive or when connection needs to be re-established). If establishing the connection
times out, this will throw ServiceBusTimeoutException.
ReceiveAsync(TimeSpan)
Receive a message from the entity defined by Path using ReceiveMode mode.
public Task<Message> ReceiveAsync(TimeSpan operationTimeout)
Parameters
operationTimeout
TimeSpanThe time span the client waits for receiving a message before it times out.
Returns
Remarks
The parameter operationTimeout
includes the time taken by the receiver to establish a connection
(either during the first receive or when connection needs to be re-established). If establishing the connection
times out, this will throw ServiceBusTimeoutException.
ReceiveDeferredMessageAsync(IEnumerable<long>)
Receives a IList<T> of deferred messages identified by sequenceNumbers
.
public Task<IList<Message>> ReceiveDeferredMessageAsync(IEnumerable<long> sequenceNumbers)
Parameters
sequenceNumbers
IEnumerable<long>An IEnumerable<T> containing the sequence numbers to receive.
Returns
- Task<IList<Message>>
Messages identified by sequence number are returned. Returns null if no messages are found. Throws if the messages have not been deferred.
- See Also
ReceiveDeferredMessageAsync(long)
Receives a specific deferred message identified by sequenceNumber
.
public Task<Message> ReceiveDeferredMessageAsync(long sequenceNumber)
Parameters
sequenceNumber
longThe sequence number of the message that will be received.
Returns
- Task<Message>
Message identified by sequence number
sequenceNumber
. Returns null if no such message is found. Throws if the message has not been deferred.
- See Also
RegisterMessageHandler(Func<Message, CancellationToken, Task>, MessageHandlerOptions)
Receive messages continuously from the entity. Registers a message handler and begins a new thread to receive messages. This handler(Func<T1, T2, TResult>) is awaited on every time a new message is received by the receiver.
public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions)
Parameters
handler
Func<Message, CancellationToken, Task>A Func<T1, T2, TResult> that processes messages.
messageHandlerOptions
MessageHandlerOptionsThe MessageHandlerOptions options used to configure the settings of the pump.
Remarks
Enable prefetch to speed up the receive rate.
RegisterMessageHandler(Func<Message, CancellationToken, Task>, Func<ExceptionReceivedEventArgs, Task>)
Receive messages continuously from the entity. Registers a message handler and begins a new thread to receive messages. This handler(Func<T1, T2, TResult>) is awaited on every time a new message is received by the receiver.
public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
Parameters
handler
Func<Message, CancellationToken, Task>A Func<T1, T2, TResult> that processes messages.
exceptionReceivedHandler
Func<ExceptionReceivedEventArgs, Task>A Func<T, TResult> that is used to notify exceptions.
RegisterPlugin(ServiceBusPlugin)
Registers a ServiceBusPlugin to be used with this receiver.
public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
Parameters
serviceBusPlugin
ServiceBusPlugin
RenewLockAsync(Message)
Renews the lock on the message specified by the lock token. The lock will be renewed based on the setting specified on the queue.
public Task RenewLockAsync(Message message)
Parameters
message
Message
Returns
Remarks
When a message is received in PeekLock mode, the message is locked on the server for this receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, it resets the time the message is locked by the LockDuration set on the Entity.
RenewLockAsync(string)
Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
public Task<DateTime> RenewLockAsync(string lockToken)
Parameters
lockToken
stringLock token associated with the message.
Returns
Remarks
When a message is received in PeekLock mode, the message is locked on the server for this receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, it resets the time the message is locked by the LockDuration set on the Entity.
UnregisterPlugin(string)
Unregisters a ServiceBusPlugin.
public override void UnregisterPlugin(string serviceBusPluginName)