LimitedConcurrencyLevelTaskScheduler.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace WebDAVSharp.Server.Utilities
  6. {
  7. /// <summary>
  8. /// Provides a task scheduler that ensures a maximum concurrency level while running on top of the thread pool.
  9. /// </summary>
  10. public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
  11. {
  12. // Indicates whether the current thread is processing work items.
  13. [ThreadStatic] private static bool _currentThreadIsProcessingItems;
  14. // The list of tasks to be executed
  15. // The maximum concurrency level allowed by this scheduler.
  16. private readonly int _maxDegreeOfParallelism;
  17. private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
  18. // Indicates whether the scheduler is currently processing work items.
  19. private int _delegatesQueuedOrRunning;
  20. /// <summary>
  21. /// Creates a new instance with the specified degree of parallelism.
  22. /// </summary>
  23. /// <param name="maxDegreeOfParallelism"></param>
  24. public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
  25. {
  26. if (maxDegreeOfParallelism < 1)
  27. throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
  28. _maxDegreeOfParallelism = maxDegreeOfParallelism;
  29. }
  30. /// <summary>
  31. /// Gets the maximum concurrency level supported by this scheduler.
  32. /// </summary>
  33. public override sealed int MaximumConcurrencyLevel => _maxDegreeOfParallelism;
  34. /// <summary>
  35. /// Queues a task to the scheduler.
  36. /// </summary>
  37. /// <param name="task"></param>
  38. protected override sealed void QueueTask(Task task)
  39. {
  40. // Add the task to the list of tasks to be processed. If there aren't enough
  41. // delegates currently queued or running to process tasks, schedule another.
  42. lock (_tasks)
  43. {
  44. _tasks.AddLast(task);
  45. if (_delegatesQueuedOrRunning >= _maxDegreeOfParallelism)
  46. return;
  47. ++_delegatesQueuedOrRunning;
  48. NotifyThreadPoolOfPendingWork();
  49. }
  50. }
  51. // Inform the ThreadPool that there's work to be executed for this scheduler.
  52. private void NotifyThreadPoolOfPendingWork()
  53. {
  54. ThreadPool.UnsafeQueueUserWorkItem(_ =>
  55. {
  56. // Note that the current thread is now processing work items.
  57. // This is necessary to enable inlining of tasks into this thread.
  58. _currentThreadIsProcessingItems = true;
  59. try
  60. {
  61. // Process all available items in the queue.
  62. while (true)
  63. {
  64. Task item;
  65. lock (_tasks)
  66. {
  67. // When there are no more items to be processed,
  68. // note that we're done processing, and get out.
  69. if (_tasks.Count == 0)
  70. {
  71. --_delegatesQueuedOrRunning;
  72. break;
  73. }
  74. // Get the next item from the queue
  75. item = _tasks.First.Value;
  76. _tasks.RemoveFirst();
  77. }
  78. // Execute the task we pulled out of the queue
  79. TryExecuteTask(item);
  80. }
  81. }
  82. // We're done processing items on the current thread
  83. finally
  84. {
  85. _currentThreadIsProcessingItems = false;
  86. }
  87. }, null);
  88. }
  89. /// <summary>
  90. /// Attempts to execute the specified task on the current thread.
  91. /// </summary>
  92. /// <param name="task"></param>
  93. /// <param name="taskWasPreviouslyQueued"></param>
  94. /// <returns></returns>
  95. protected override sealed bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  96. {
  97. // If this thread isn't already processing a task, we don't support inlining
  98. if (!_currentThreadIsProcessingItems)
  99. return false;
  100. // If the task was previously queued, remove it from the queue
  101. if (taskWasPreviouslyQueued)
  102. // Try to run the task.
  103. return TryDequeue(task) && TryExecuteTask(task);
  104. return TryExecuteTask(task);
  105. }
  106. /// <summary>
  107. /// Attempt to remove a previously scheduled task from the scheduler.
  108. /// </summary>
  109. /// <param name="task"></param>
  110. /// <returns></returns>
  111. protected override sealed bool TryDequeue(Task task)
  112. {
  113. lock (_tasks)
  114. return _tasks.Remove(task);
  115. }
  116. /// <summary>
  117. /// Gets an enumerable of the tasks currently scheduled on this scheduler.
  118. /// </summary>
  119. /// <returns></returns>
  120. protected override sealed IEnumerable<Task> GetScheduledTasks()
  121. {
  122. bool lockTaken = false;
  123. try
  124. {
  125. Monitor.TryEnter(_tasks, ref lockTaken);
  126. if (lockTaken)
  127. return _tasks;
  128. else
  129. throw new NotSupportedException();
  130. }
  131. finally
  132. {
  133. if (lockTaken)
  134. Monitor.Exit(_tasks);
  135. }
  136. }
  137. }
  138. }