Scaling Connections with BlockingCollection()
I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.
I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().
So something like (semi-pseudocode):
Socket Reading Task
Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});
Message Buffer Task
Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});
So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.
I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.
Is this an inefficient way to handle it? what would be a better way?
c# multithreading performance networking parallel-processing
add a comment |
I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.
I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().
So something like (semi-pseudocode):
Socket Reading Task
Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});
Message Buffer Task
Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});
So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.
I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.
Is this an inefficient way to handle it? what would be a better way?
c# multithreading performance networking parallel-processing
What do you do with these messages
– TheGeneral
Nov 20 at 9:30
@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 at 16:06
add a comment |
I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.
I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().
So something like (semi-pseudocode):
Socket Reading Task
Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});
Message Buffer Task
Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});
So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.
I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.
Is this an inefficient way to handle it? what would be a better way?
c# multithreading performance networking parallel-processing
I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.
I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().
So something like (semi-pseudocode):
Socket Reading Task
Task.Run(() =>
{
while (notCancelled)
{
element = ReadXml();
switch (element)
{
case messageheader:
MessageBlockingQueue.Add(deserialze<messageType>());
...
}
}
});
Message Buffer Task
Task.Run(() =>
{
while (notCancelled)
{
Process(MessageQueue.Take());
}
});
So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.
I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.
Is this an inefficient way to handle it? what would be a better way?
c# multithreading performance networking parallel-processing
c# multithreading performance networking parallel-processing
asked Nov 20 at 8:43
FinalFortune
136514
136514
What do you do with these messages
– TheGeneral
Nov 20 at 9:30
@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 at 16:06
add a comment |
What do you do with these messages
– TheGeneral
Nov 20 at 9:30
@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 at 16:06
What do you do with these messages
– TheGeneral
Nov 20 at 9:30
What do you do with these messages
– TheGeneral
Nov 20 at 9:30
@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 at 16:06
@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 at 16:06
add a comment |
3 Answers
3
active
oldest
votes
You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.
Your read loop would become:
while (notCancelled) {
var next = await queue.Reader.ReadAsync(optionalCancellationToken);
Process(next);
}
and the producer:
switch (element)
{
case messageheader:
queue.Writer.TryWrite(deserialze<messageType>());
...
}
so: minimal changes
Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
add a comment |
I actually do something similar in another project. What I learned or would do differently are the following:
First of all, better to use dedicated threads for the reading/writing loop (with
new Thread(ParameterizedThreadStart)
) becauseTask.Run
uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Your
Process
can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
So the OnMessageReceived
implementation can be:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Finally it was a big lesson that
BlockingCollection<T>
has some performance issues. It usesSpinWait
internally, whoseSpinOnce
method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fastBlockingCollection
implementation using anAutoResetEvent
for triggering incoming data. I added aTake(CancellationToken)
overload to it as follows:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection
also will do it.
add a comment |
Yes, this is a bit inefficient, because you block ThreadPool threads.
I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern
You can also look at examples with testing a producer -consumer pattern here:
https://github.com/BBGONE/TestThreadAffinity
You can use await Task.Yield in the loop to give other tasks access to this thread.
You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.
If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389154%2fscaling-connections-with-blockingcollectiont%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.
Your read loop would become:
while (notCancelled) {
var next = await queue.Reader.ReadAsync(optionalCancellationToken);
Process(next);
}
and the producer:
switch (element)
{
case messageheader:
queue.Writer.TryWrite(deserialze<messageType>());
...
}
so: minimal changes
Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
add a comment |
You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.
Your read loop would become:
while (notCancelled) {
var next = await queue.Reader.ReadAsync(optionalCancellationToken);
Process(next);
}
and the producer:
switch (element)
{
case messageheader:
queue.Writer.TryWrite(deserialze<messageType>());
...
}
so: minimal changes
Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
add a comment |
You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.
Your read loop would become:
while (notCancelled) {
var next = await queue.Reader.ReadAsync(optionalCancellationToken);
Process(next);
}
and the producer:
switch (element)
{
case messageheader:
queue.Writer.TryWrite(deserialze<messageType>());
...
}
so: minimal changes
Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).
You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.
Your read loop would become:
while (notCancelled) {
var next = await queue.Reader.ReadAsync(optionalCancellationToken);
Process(next);
}
and the producer:
switch (element)
{
case messageheader:
queue.Writer.TryWrite(deserialze<messageType>());
...
}
so: minimal changes
Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).
edited Nov 20 at 11:03
answered Nov 20 at 10:44
Marc Gravell♦
776k19121252537
776k19121252537
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
add a comment |
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
Interesting, will be taking a look at all of them. I was looking at TPL Dataflow, but seems like this is the better option.
– FinalFortune
Nov 22 at 6:18
add a comment |
I actually do something similar in another project. What I learned or would do differently are the following:
First of all, better to use dedicated threads for the reading/writing loop (with
new Thread(ParameterizedThreadStart)
) becauseTask.Run
uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Your
Process
can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
So the OnMessageReceived
implementation can be:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Finally it was a big lesson that
BlockingCollection<T>
has some performance issues. It usesSpinWait
internally, whoseSpinOnce
method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fastBlockingCollection
implementation using anAutoResetEvent
for triggering incoming data. I added aTake(CancellationToken)
overload to it as follows:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection
also will do it.
add a comment |
I actually do something similar in another project. What I learned or would do differently are the following:
First of all, better to use dedicated threads for the reading/writing loop (with
new Thread(ParameterizedThreadStart)
) becauseTask.Run
uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Your
Process
can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
So the OnMessageReceived
implementation can be:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Finally it was a big lesson that
BlockingCollection<T>
has some performance issues. It usesSpinWait
internally, whoseSpinOnce
method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fastBlockingCollection
implementation using anAutoResetEvent
for triggering incoming data. I added aTake(CancellationToken)
overload to it as follows:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection
also will do it.
add a comment |
I actually do something similar in another project. What I learned or would do differently are the following:
First of all, better to use dedicated threads for the reading/writing loop (with
new Thread(ParameterizedThreadStart)
) becauseTask.Run
uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Your
Process
can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
So the OnMessageReceived
implementation can be:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Finally it was a big lesson that
BlockingCollection<T>
has some performance issues. It usesSpinWait
internally, whoseSpinOnce
method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fastBlockingCollection
implementation using anAutoResetEvent
for triggering incoming data. I added aTake(CancellationToken)
overload to it as follows:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection
also will do it.
I actually do something similar in another project. What I learned or would do differently are the following:
First of all, better to use dedicated threads for the reading/writing loop (with
new Thread(ParameterizedThreadStart)
) becauseTask.Run
uses a pool thread and as you use it in a (nearly) endless loop the thread is practically never returned to the pool.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Your
Process
can be an event, which you can invoke asynchronously so your reader loop can be return immediately to process the new incoming packages as fast as possible:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Please note that if a delegate has multiple targets it's async invocation is not trivial. I created this extension method for invoking a delegate on pool threads:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
So the OnMessageReceived
implementation can be:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Finally it was a big lesson that
BlockingCollection<T>
has some performance issues. It usesSpinWait
internally, whoseSpinOnce
method waits longer and longer times if there is no incoming data for a long time. This is a tricky issue because even if you log every single step of the processing you will not notice that everything is started delayed unless you can mock also the server side. Here you can find a fastBlockingCollection
implementation using anAutoResetEvent
for triggering incoming data. I added aTake(CancellationToken)
overload to it as follows:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
Basically that's it. Maybe not everything is applicable in your case, eg. if the nearly immediate response is not crucial the regular BlockingCollection
also will do it.
edited Nov 20 at 9:59
answered Nov 20 at 9:53
taffer
7,97521536
7,97521536
add a comment |
add a comment |
Yes, this is a bit inefficient, because you block ThreadPool threads.
I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern
You can also look at examples with testing a producer -consumer pattern here:
https://github.com/BBGONE/TestThreadAffinity
You can use await Task.Yield in the loop to give other tasks access to this thread.
You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.
If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest
add a comment |
Yes, this is a bit inefficient, because you block ThreadPool threads.
I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern
You can also look at examples with testing a producer -consumer pattern here:
https://github.com/BBGONE/TestThreadAffinity
You can use await Task.Yield in the loop to give other tasks access to this thread.
You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.
If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest
add a comment |
Yes, this is a bit inefficient, because you block ThreadPool threads.
I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern
You can also look at examples with testing a producer -consumer pattern here:
https://github.com/BBGONE/TestThreadAffinity
You can use await Task.Yield in the loop to give other tasks access to this thread.
You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.
If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest
Yes, this is a bit inefficient, because you block ThreadPool threads.
I already discussed this problem Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern
You can also look at examples with testing a producer -consumer pattern here:
https://github.com/BBGONE/TestThreadAffinity
You can use await Task.Yield in the loop to give other tasks access to this thread.
You can solve it also by using dedicated threads or better a custom ThreadScheduler which uses its own thread pool. But it is ineffective to create 50+ plain threads. Better to adjust the task, so it would be more cooperative.
If you use a BlockingCollection (because it can block the thread for long while waiting to write (if bounded) or to read or no items to read) then it is better to use System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md
They don't block the thread while waiting when the collection will be available to write or to read. There's an example how it is used https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest
edited Nov 20 at 11:00
answered Nov 20 at 10:10
Maxim T
947
947
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53389154%2fscaling-connections-with-blockingcollectiont%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
What do you do with these messages
– TheGeneral
Nov 20 at 9:30
@TheGeneral Database storage, logging and sending out the data to clients, but some data does require extra processing.
– FinalFortune
Nov 20 at 16:06