HubWorkerAssigner.cs 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. using Microsoft.Extensions.Hosting;
  2. using Microsoft.Extensions.Logging;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reflection;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. namespace CompServ.Hub
  11. {
  12. internal class HubWorkerAssigner(ILogger<HubWorkerAssigner> logger, HubWorkerHolder holder) : BackgroundService
  13. {
  14. internal int _index = 0;
  15. internal int _assign = 0;
  16. protected override async Task ExecuteAsync(CancellationToken stoppingToken)
  17. {
  18. logger.LogInformation("Start Assign service");
  19. while (stoppingToken.IsCancellationRequested == false)
  20. {
  21. string urlToQueue = null;
  22. int ratio = default;
  23. var takeFail = false;
  24. lock (holder.Workers)
  25. {
  26. if (holder.Workers.Count == 0)
  27. {
  28. takeFail = true;
  29. }
  30. else
  31. {
  32. if (_index >= holder.Workers.Count)
  33. {
  34. _index = 0;
  35. _assign = 0;
  36. }
  37. var item = holder.Workers[_index];
  38. urlToQueue = item.url;
  39. ratio = item.ratio;
  40. }
  41. }
  42. if (takeFail)
  43. {
  44. await Task.Delay(100, stoppingToken);
  45. continue;
  46. }
  47. logger.LogInformation($"Prep: {urlToQueue}");
  48. await holder.Queue1.Writer.WriteAsync(urlToQueue, stoppingToken);
  49. //handle next worker with ratio
  50. _assign++;
  51. if (_assign >= ratio)
  52. {
  53. _index++;
  54. _assign = 0;
  55. lock (holder.Workers)
  56. {
  57. if (_index >= holder.Workers.Count) _index = 0;
  58. }
  59. }
  60. }
  61. }
  62. }
  63. }