Table of Contents

Class MessageReceiver

Namespace
Microsoft.Azure.ServiceBus.Core
Assembly
Microsoft.Azure.ServiceBus.dll

The MessageReceiver can be used to receive messages from Queues and Subscriptions and acknowledge them.

public class MessageReceiver : ClientEntity, IMessageReceiver, IReceiverClient, IClientEntity
Inheritance
MessageReceiver
Implements
Inherited Members

Examples

Create a new MessageReceiver to receive a message from a Subscription

IMessageReceiver messageReceiver = new MessageReceiver(
    namespaceConnectionString,
    EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName),
    ReceiveMode.PeekLock);

Receive a message from the Subscription.

var message = await messageReceiver.ReceiveAsync();
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

Remarks

The MessageReceiver provides advanced functionality that is not found in the QueueClient or SubscriptionClient. For instance, ReceiveAsync(), which allows you to receive messages on demand, but also requires you to manually renew locks using RenewLockAsync(Message). It uses AMQP protocol to communicate with service.

Constructors

MessageReceiver(ServiceBusConnection, string, ReceiveMode, RetryPolicy, int)

Creates a new AMQP MessageReceiver on a given ServiceBusConnection

public MessageReceiver(ServiceBusConnection serviceBusConnection, string entityPath, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)

Parameters

serviceBusConnection ServiceBusConnection

Connection object to the service bus namespace.

entityPath string

The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path. You can use FormatSubscriptionPath(string, string), to help create this path.

receiveMode ReceiveMode

The ReceiveMode used to specify how messages are received. Defaults to PeekLock mode.

retryPolicy RetryPolicy

The RetryPolicy that will be used when communicating with Service Bus. Defaults to Default

prefetchCount int

The PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.

MessageReceiver(ServiceBusConnectionStringBuilder, ReceiveMode, RetryPolicy, int)

Creates a new MessageReceiver from a ServiceBusConnectionStringBuilder.

public MessageReceiver(ServiceBusConnectionStringBuilder connectionStringBuilder, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)

Parameters

connectionStringBuilder ServiceBusConnectionStringBuilder

The ServiceBusConnectionStringBuilder having entity level connection details.

receiveMode ReceiveMode

The ReceiveMode used to specify how messages are received. Defaults to PeekLock mode.

retryPolicy RetryPolicy

The RetryPolicy that will be used when communicating with Service Bus. Defaults to Default.

prefetchCount int

The PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.

Remarks

Creates a new connection to the entity, which is opened during the first operation.

MessageReceiver(string, string, ITokenProvider, TransportType, ReceiveMode, RetryPolicy, int)

Creates a new MessageReceiver from a specified endpoint, entity path, and token provider.

public MessageReceiver(string endpoint, string entityPath, ITokenProvider tokenProvider, TransportType transportType = TransportType.Amqp, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)

Parameters

endpoint string

Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.windows.net

entityPath string

Queue path.

tokenProvider ITokenProvider

Token provider which will generate security tokens for authorization.

transportType TransportType

Transport type.

receiveMode ReceiveMode

Mode of receive of messages. Defaults to ReceiveMode.PeekLock.

retryPolicy RetryPolicy

Retry policy for queue operations. Defaults to Default

prefetchCount int

The PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.

Remarks

Creates a new connection to the entity, which is opened during the first operation.

MessageReceiver(string, string, ReceiveMode, RetryPolicy, int)

Creates a new MessageReceiver from a specified connection string and entity path.

public MessageReceiver(string connectionString, string entityPath, ReceiveMode receiveMode = ReceiveMode.PeekLock, RetryPolicy retryPolicy = null, int prefetchCount = 0)

Parameters

connectionString string

Namespace connection string used to communicate with Service Bus. Must not contain Entity details.

entityPath string

The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path. You can use FormatSubscriptionPath(string, string), to help create this path.

receiveMode ReceiveMode

The ReceiveMode used to specify how messages are received. Defaults to PeekLock mode.

retryPolicy RetryPolicy

The RetryPolicy that will be used when communicating with Service Bus. Defaults to Default

prefetchCount int

The PrefetchCount that specifies the upper limit of messages this receiver will actively receive regardless of whether a receive operation is pending. Defaults to 0.

Remarks

Creates a new connection to the entity, which is opened during the first operation.

Properties

LastPeekedSequenceNumber

Gets the sequence number of the last peeked message.

public long LastPeekedSequenceNumber { get; }

Property Value

long
See Also

OperationTimeout

Duration after which individual operations will timeout.

public override TimeSpan OperationTimeout { get; set; }

Property Value

TimeSpan

Path

The path of the entity for this receiver. For Queues this will be the name, but for Subscriptions this will be the path.

public override string Path { get; }

Property Value

string

PrefetchCount

Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when and before the application asks for one using Receive. Setting a non-zero value prefetches PrefetchCount number of messages. Setting the value to zero turns prefetch off. Defaults to 0.

public int PrefetchCount { get; set; }

Property Value

int

Remarks

When Prefetch is enabled, the receiver will quietly acquire more messages, up to the PrefetchCount limit, than what the application immediately asks for. A single initial Receive/ReceiveAsync call will therefore acquire a message for immediate consumption that will be returned as soon as available, and the client will proceed to acquire further messages to fill the prefetch buffer in the background.

While messages are available in the prefetch buffer, any subsequent ReceiveAsync calls will be immediately satisfied from the buffer, and the buffer is replenished in the background as space becomes available.If there are no messages available for delivery, the receive operation will drain the buffer and then wait or block as expected.

Prefetch also works equivalently with the RegisterMessageHandler(Func<Message, CancellationToken, Task>, Func<ExceptionReceivedEventArgs, Task>) APIs.

Updates to this value take effect on the next receive call to the service.

ReceiveMode

Gets the ReceiveMode of the current receiver.

public ReceiveMode ReceiveMode { get; protected set; }

Property Value

ReceiveMode

RegisteredPlugins

Gets a list of currently registered plugins.

public override IList<ServiceBusPlugin> RegisteredPlugins { get; }

Property Value

IList<ServiceBusPlugin>

ServiceBusConnection

Connection object to the service bus namespace.

public override ServiceBusConnection ServiceBusConnection { get; }

Property Value

ServiceBusConnection

Methods

AbandonAsync(string, IDictionary<string, object>)

Abandons a Message using a lock token. This will make the message available again for processing.

public Task AbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)

Parameters

lockToken string

The lock token of the corresponding message to abandon.

propertiesToModify IDictionary<string, object>

The properties of the message to modify while abandoning the message.

Returns

Task

Remarks

A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. Abandoning a message will increase the delivery count on the message. This operation can only be performed on messages that were received by this receiver.

CompleteAsync(IEnumerable<string>)

Completes a series of Message using a list of lock tokens. This will delete the message from the service.

public Task CompleteAsync(IEnumerable<string> lockTokens)

Parameters

lockTokens IEnumerable<string>

An IEnumerable<T> containing the lock tokens of the corresponding messages to complete.

Returns

Task

Remarks

A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. This operation can only be performed on messages that were received by this receiver.

CompleteAsync(string)

Completes a Message using its lock token. This will delete the message from the service.

public Task CompleteAsync(string lockToken)

Parameters

lockToken string

The lock token of the corresponding message to complete.

Returns

Task

Remarks

A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. This operation can only be performed on messages that were received by this receiver.

DeadLetterAsync(string, IDictionary<string, object>)

Moves a message to the deadletter sub-queue.

public Task DeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)

Parameters

lockToken string

The lock token of the corresponding message to deadletter.

propertiesToModify IDictionary<string, object>

The properties of the message to modify while moving to sub-queue.

Returns

Task

Remarks

A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. In order to receive a message from the deadletter queue, you will need a new IMessageReceiver, with the corresponding path. You can use FormatDeadLetterPath(string) to help with this. This operation can only be performed on messages that were received by this receiver.

DeadLetterAsync(string, string, string)

Moves a message to the deadletter sub-queue.

public Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null)

Parameters

lockToken string

The lock token of the corresponding message to deadletter.

deadLetterReason string

The reason for deadlettering the message.

deadLetterErrorDescription string

The error description for deadlettering the message.

Returns

Task

Remarks

A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. In order to receive a message from the deadletter queue, you will need a new IMessageReceiver, with the corresponding path. You can use FormatDeadLetterPath(string) to help with this. This operation can only be performed on messages that were received by this receiver.

DeferAsync(string, IDictionary<string, object>)

Indicates that the receiver wants to defer the processing for the message.

public Task DeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)

Parameters

lockToken string

The lock token of the Message.

propertiesToModify IDictionary<string, object>

The properties of the message to modify while deferring the message.

Returns

Task

Remarks

A lock token can be found in LockToken, only when ReceiveMode is set to PeekLock. In order to receive this message again in the future, you will need to save the SequenceNumber and receive it using ReceiveDeferredMessageAsync(long). Deferring messages does not impact message's expiration, meaning that deferred messages can still expire. This operation can only be performed on messages that were received by this receiver.

OnAbandonAsync(string, IDictionary<string, object>)

protected virtual Task OnAbandonAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)

Parameters

lockToken string
propertiesToModify IDictionary<string, object>

Returns

Task

OnClosingAsync()

protected override Task OnClosingAsync()

Returns

Task

OnCompleteAsync(IEnumerable<string>)

protected virtual Task OnCompleteAsync(IEnumerable<string> lockTokens)

Parameters

lockTokens IEnumerable<string>

Returns

Task

OnDeadLetterAsync(string, IDictionary<string, object>, string, string)

protected virtual Task OnDeadLetterAsync(string lockToken, IDictionary<string, object> propertiesToModify = null, string deadLetterReason = null, string deadLetterErrorDescription = null)

Parameters

lockToken string
propertiesToModify IDictionary<string, object>
deadLetterReason string
deadLetterErrorDescription string

Returns

Task

OnDeferAsync(string, IDictionary<string, object>)

protected virtual Task OnDeferAsync(string lockToken, IDictionary<string, object> propertiesToModify = null)

Parameters

lockToken string
propertiesToModify IDictionary<string, object>

Returns

Task

OnMessageHandler(MessageHandlerOptions, Func<Message, CancellationToken, Task>)

protected virtual void OnMessageHandler(MessageHandlerOptions registerHandlerOptions, Func<Message, CancellationToken, Task> callback)

Parameters

registerHandlerOptions MessageHandlerOptions
callback Func<Message, CancellationToken, Task>

OnPeekAsync(long, int)

protected virtual Task<IList<Message>> OnPeekAsync(long fromSequenceNumber, int messageCount = 1)

Parameters

fromSequenceNumber long
messageCount int

Returns

Task<IList<Message>>

OnReceiveAsync(int, TimeSpan)

protected virtual Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)

Parameters

maxMessageCount int
serverWaitTime TimeSpan

Returns

Task<IList<Message>>

OnReceiveDeferredMessageAsync(long[])

protected virtual Task<IList<Message>> OnReceiveDeferredMessageAsync(long[] sequenceNumbers)

Parameters

sequenceNumbers long[]

Returns

Task<IList<Message>>

OnRenewLockAsync(string)

protected virtual Task<DateTime> OnRenewLockAsync(string lockToken)

Parameters

lockToken string

Returns

Task<DateTime>

PeekAsync()

Fetches the next active message without changing the state of the receiver or the message source.

public Task<Message> PeekAsync()

Returns

Task<Message>

The Message that represents the next message to be read. Returns null when nothing to peek.

Remarks

The first call to PeekAsync() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity. Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Deferred/Deadlettered/Renewed. Also, unlike ReceiveAsync(), this method will fetch even Deferred messages (but not Deadlettered message)

PeekAsync(int)

Fetches the next batch of active messages without changing the state of the receiver or the message source.

public Task<IList<Message>> PeekAsync(int maxMessageCount)

Parameters

maxMessageCount int

Returns

Task<IList<Message>>

List of Message that represents the next message to be read. Returns null when nothing to peek.

Remarks

The first call to PeekAsync() fetches the first active message for this receiver. Each subsequent call fetches the subsequent message in the entity. Unlike a received message, peeked message will not have lock token associated with it, and hence it cannot be Completed/Abandoned/Deferred/Deadlettered/Renewed. Also, unlike ReceiveAsync(), this method will fetch even Deferred messages (but not Deadlettered message)

PeekBySequenceNumberAsync(long)

Asynchronously reads the next message without changing the state of the receiver or the message source.

public Task<Message> PeekBySequenceNumberAsync(long fromSequenceNumber)

Parameters

fromSequenceNumber long

The sequence number from where to read the message.

Returns

Task<Message>

The asynchronous operation that returns the Message that represents the next message to be read.

PeekBySequenceNumberAsync(long, int)

Peeks a batch of messages.

public Task<IList<Message>> PeekBySequenceNumberAsync(long fromSequenceNumber, int messageCount)

Parameters

fromSequenceNumber long

The starting point from which to browse a batch of messages.

messageCount int

The number of messages to retrieve.

Returns

Task<IList<Message>>

A batch of messages peeked.

ReceiveAsync()

Receive a message from the entity defined by Path using ReceiveMode mode.

public Task<Message> ReceiveAsync()

Returns

Task<Message>

The message received. Returns null if no message is found.

Remarks

Operation will time out after duration of OperationTimeout

ReceiveAsync(int)

Receives a maximum of maxMessageCount messages from the entity defined by Path using ReceiveMode mode.

public Task<IList<Message>> ReceiveAsync(int maxMessageCount)

Parameters

maxMessageCount int

The maximum number of messages that will be received.

Returns

Task<IList<Message>>

List of messages received. Returns null if no message is found.

Remarks

Receiving less than maxMessageCount messages is not an indication of empty entity.

ReceiveAsync(int, TimeSpan)

Receives a maximum of maxMessageCount messages from the entity defined by Path using ReceiveMode mode.

public Task<IList<Message>> ReceiveAsync(int maxMessageCount, TimeSpan operationTimeout)

Parameters

maxMessageCount int

The maximum number of messages that will be received.

operationTimeout TimeSpan

The time span the client waits for receiving a message before it times out.

Returns

Task<IList<Message>>

List of messages received. Returns null if no message is found.

Remarks

Receiving less than maxMessageCount messages is not an indication of empty entity. The parameter operationTimeout includes the time taken by the receiver to establish a connection (either during the first receive or when connection needs to be re-established). If establishing the connection times out, this will throw ServiceBusTimeoutException.

ReceiveAsync(TimeSpan)

Receive a message from the entity defined by Path using ReceiveMode mode.

public Task<Message> ReceiveAsync(TimeSpan operationTimeout)

Parameters

operationTimeout TimeSpan

The time span the client waits for receiving a message before it times out.

Returns

Task<Message>

The message received. Returns null if no message is found.

Remarks

The parameter operationTimeout includes the time taken by the receiver to establish a connection (either during the first receive or when connection needs to be re-established). If establishing the connection times out, this will throw ServiceBusTimeoutException.

ReceiveDeferredMessageAsync(IEnumerable<long>)

Receives a IList<T> of deferred messages identified by sequenceNumbers.

public Task<IList<Message>> ReceiveDeferredMessageAsync(IEnumerable<long> sequenceNumbers)

Parameters

sequenceNumbers IEnumerable<long>

An IEnumerable<T> containing the sequence numbers to receive.

Returns

Task<IList<Message>>

Messages identified by sequence number are returned. Returns null if no messages are found. Throws if the messages have not been deferred.

See Also

ReceiveDeferredMessageAsync(long)

Receives a specific deferred message identified by sequenceNumber.

public Task<Message> ReceiveDeferredMessageAsync(long sequenceNumber)

Parameters

sequenceNumber long

The sequence number of the message that will be received.

Returns

Task<Message>

Message identified by sequence number sequenceNumber. Returns null if no such message is found. Throws if the message has not been deferred.

See Also

RegisterMessageHandler(Func<Message, CancellationToken, Task>, MessageHandlerOptions)

Receive messages continuously from the entity. Registers a message handler and begins a new thread to receive messages. This handler(Func<T1, T2, TResult>) is awaited on every time a new message is received by the receiver.

public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions)

Parameters

handler Func<Message, CancellationToken, Task>

A Func<T1, T2, TResult> that processes messages.

messageHandlerOptions MessageHandlerOptions

The MessageHandlerOptions options used to configure the settings of the pump.

Remarks

Enable prefetch to speed up the receive rate.

RegisterMessageHandler(Func<Message, CancellationToken, Task>, Func<ExceptionReceivedEventArgs, Task>)

Receive messages continuously from the entity. Registers a message handler and begins a new thread to receive messages. This handler(Func<T1, T2, TResult>) is awaited on every time a new message is received by the receiver.

public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)

Parameters

handler Func<Message, CancellationToken, Task>

A Func<T1, T2, TResult> that processes messages.

exceptionReceivedHandler Func<ExceptionReceivedEventArgs, Task>

A Func<T, TResult> that is used to notify exceptions.

RegisterPlugin(ServiceBusPlugin)

Registers a ServiceBusPlugin to be used with this receiver.

public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)

Parameters

serviceBusPlugin ServiceBusPlugin

RenewLockAsync(Message)

Renews the lock on the message specified by the lock token. The lock will be renewed based on the setting specified on the queue.

public Task RenewLockAsync(Message message)

Parameters

message Message

Returns

Task

Remarks

When a message is received in PeekLock mode, the message is locked on the server for this receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, it resets the time the message is locked by the LockDuration set on the Entity.

RenewLockAsync(string)

Renews the lock on the message. The lock will be renewed based on the setting specified on the queue. New lock token expiry date and time in UTC format.

public Task<DateTime> RenewLockAsync(string lockToken)

Parameters

lockToken string

Lock token associated with the message.

Returns

Task<DateTime>

Remarks

When a message is received in PeekLock mode, the message is locked on the server for this receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration). If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, it resets the time the message is locked by the LockDuration set on the Entity.

UnregisterPlugin(string)

Unregisters a ServiceBusPlugin.

public override void UnregisterPlugin(string serviceBusPluginName)

Parameters

serviceBusPluginName string

The Name of the plugin to be unregistered.