Browse Source

Tested CompServ; PictureMover + m key;

HOME 3 months ago
parent
commit
5678d9d726

+ 12 - 2
CompressService/CompServ.ClientLibrary/CompServHubClient.cs

@@ -4,16 +4,26 @@ namespace CompServ.ClientLibrary;
 
 public class CompServHubClient(string server) : CompServClient(server, ApiPathHubRootForCheckAlive, AliveMessageHub)
 {
-    public async Task RegisterWorker(int port)
+    public async Task RegisterWorker(int port, int ratio)
     {
+        var msg = ModelExtensionMethod.BuildWorkerRegisterRequestMessage(port, ratio);
+        var httpResponseMessage = await new HttpClient() { BaseAddress = ServerUri }.SendAsync(msg);
+        httpResponseMessage.EnsureSuccessStatusCode();
     }
 
     public async Task UnRegisterWorker(int port)
     {
+        var msg = ModelExtensionMethod.BuildWorkerUnRegisterRequestMessage(port);
+        var httpResponseMessage = await new HttpClient() { BaseAddress = ServerUri }.SendAsync(msg);
+        httpResponseMessage.EnsureSuccessStatusCode();
     }
 
     public async Task<string> GetWorker()
     {
-        throw new NotImplementedException();
+        var msg = ModelExtensionMethod.BuildClientGetWorkerRequest();
+        var httpResponseMessage = await new HttpClient() { BaseAddress = ServerUri }.SendAsync(msg);
+        httpResponseMessage.EnsureSuccessStatusCode();
+        var workerUrl = await httpResponseMessage.Content.ReadAsStringAsync();
+        return workerUrl;
     }
 }

+ 47 - 0
CompressService/CompServ.Hub/HubController.cs

@@ -0,0 +1,47 @@
+using Microsoft.AspNetCore.Mvc;
+using static CompServ.CompServConst;
+
+namespace CompServ.Hub
+{
+    [ApiController]
+    public class HubController(HubWorkerHolder hubWorkerHolder) : ControllerBase
+    {
+        [HttpGet]
+        [Route(ApiPathHubRootForCheckAlive)]
+        public ActionResult CheckAlive() => Content(AliveMessageHub);
+
+        [HttpPost]
+        [Route(ApiPathHubRegisterWorker)]
+        public async Task<ActionResult> Register()
+        {
+            var wr = HttpContext.Request.ExtractWorkerRegister();
+            if (wr.port == null || wr.ratio == null) return StatusCode(400, "missing required header");
+
+            var url = $"http://{Request.HttpContext.Connection.RemoteIpAddress}:{wr.port}";
+            hubWorkerHolder.RegisterWorker(url, wr.ratio.Value);
+
+            return Empty;
+        }
+
+        [HttpPost]
+        [Route(ApiPathHubUnRegisterWorker)]
+        public async Task<ActionResult> UnRegister()
+        {
+            var wr = HttpContext.Request.ExtractWorkerRegister();
+            if (wr.port == null) return StatusCode(400, "missing required header");
+
+            var url = $"http://{Request.HttpContext.Connection.RemoteIpAddress}:{wr.port}";
+            hubWorkerHolder.UnRegisterWorker(url);
+
+            return Empty;
+        }
+
+        [HttpGet]
+        [Route(ApiPathHubGetWorker)]
+        public async Task<ActionResult> GetWorker()
+        {
+            var url = await hubWorkerHolder.GetWorkerAsync();
+            return Content(url);
+        }
+    }
+}

+ 10 - 6
CompressService/CompServ.Hub/Program.cs

@@ -1,4 +1,6 @@
-using Microsoft.AspNetCore.Builder;
+using System.Net;
+using CompServ.Hub;
+using Microsoft.AspNetCore.Builder;
 using Microsoft.AspNetCore.Hosting;
 using Microsoft.AspNetCore.Http;
 using Microsoft.AspNetCore.Server.Kestrel.Core;
@@ -32,6 +34,7 @@ builder.Services.Configure<LoggerFilterOptions>(options =>
         if (category == "Microsoft.AspNetCore.Server.Kestrel.Http2" && level == LogLevel.Information) return false;
         if (category == "Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker" && level == LogLevel.Information) return false;
         if (category == "Microsoft.AspNetCore.Mvc.Infrastructure.ContentResultExecutor" && level == LogLevel.Information) return false;
+        if (category == "Microsoft.AspNetCore.Mvc.Infrastructure.ObjectResultExecutor" && level == LogLevel.Information) return false;
 
         return true; // 其他日志保留
     });
@@ -40,10 +43,12 @@ builder.Services.Configure<LoggerFilterOptions>(options =>
 //配置HTTP1、HTTP2端口
 builder.WebHost.UseKestrel((_, kso) =>
 {
-    kso.ListenAnyIP(0, lo => lo.Protocols = HttpProtocols.Http2);
-    kso.ListenAnyIP(0, lo => lo.Protocols = HttpProtocols.Http1);
+    kso.Listen(new IPEndPoint(IPAddress.Any, 0), lo => lo.Protocols = HttpProtocols.Http2);
+    kso.Listen(new IPEndPoint(IPAddress.Any, 0), lo => lo.Protocols = HttpProtocols.Http1);
 });
 
+builder.Services.AddSingleton<HubWorkerHolder>();
+builder.Services.AddHostedService<HubWorkerAssigner>();
 builder.Services.AddControllers();
 
 var app = builder.Build();
@@ -56,7 +61,6 @@ app.Use(async (context, next) =>
 
 app.MapControllers();
 
-app.MapGet("/", () =>"I am hub!");
+app.MapGet("/", () => "I am hub!");
 
-
-await app.RunAsync();
+await app.RunAsync();

+ 72 - 0
CompressService/CompServ.Hub/HubWorkerAssigner.cs

@@ -0,0 +1,72 @@
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace CompServ.Hub
+{
+    internal class HubWorkerAssigner(ILogger<HubWorkerAssigner> logger, HubWorkerHolder holder) : BackgroundService
+    {
+        internal int _index = 0;
+        internal int _assign = 0;
+
+        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+        {
+            logger.LogInformation("Start Assign service");
+
+            while (stoppingToken.IsCancellationRequested == false)
+            {
+                string urlToQueue = null;
+                int ratio = default;
+
+                var takeFail = false;
+                lock (holder.Workers)
+                {
+                    if (holder.Workers.Count == 0)
+                    {
+                        takeFail = true;
+                    }
+                    else
+                    {
+                        if (_index >= holder.Workers.Count)
+                        {
+                            _index = 0;
+                            _assign = 0;
+                        }
+
+                        var item = holder.Workers[_index];
+                        urlToQueue = item.url;
+                        ratio = item.ratio;
+                    }
+                }
+
+                if (takeFail)
+                {
+                    await Task.Delay(100, stoppingToken);
+                    continue;
+                }
+
+                logger.LogInformation($"Prep: {urlToQueue}");
+                await holder.Queue1.Writer.WriteAsync(urlToQueue, stoppingToken);
+
+                //handle next worker with ratio
+                _assign++;
+                if (_assign >= ratio)
+                {
+                    _index++;
+                    _assign = 0;
+
+                    lock (holder.Workers)
+                    {
+                        if (_index >= holder.Workers.Count) _index = 0;
+                    }
+                }
+            }
+        }
+    }
+}

+ 39 - 0
CompressService/CompServ.Hub/HubWorkerHolder.cs

@@ -0,0 +1,39 @@
+using System.Threading.Channels;
+using Microsoft.Extensions.Logging;
+
+namespace CompServ.Hub
+{
+    public class HubWorkerHolder(ILogger<HubWorkerHolder> logger)
+    {
+        internal readonly List<(string url, int ratio)> Workers = new();
+
+        internal readonly Channel<string> Queue1 = Channel.CreateBounded<string>(1);
+
+        public void RegisterWorker(string url, int ratio)
+        {
+            logger.LogInformation($"Register worker: {url} with ratio {ratio}");
+            lock (Workers)
+            {
+                if (Workers.Any(p => p.url == url)) return;
+                Workers.Add((url, ratio));
+            }
+        }
+
+        public void UnRegisterWorker(string url)
+        {
+            logger.LogInformation($"Un Register worker: {url}");
+            lock (Workers)
+            {
+                var found = Workers.FirstOrDefault(p => p.url == url);
+                if (found != default) Workers.Remove(found);
+            }
+        }
+
+        public async Task<string> GetWorkerAsync()
+        {
+            var url = await Queue1.Reader.ReadAsync();
+            logger.LogInformation($"Assign worker: {url}");
+            return url;
+        }
+    }
+}

+ 0 - 17
CompressService/CompServ.Hub/WorkerController.cs

@@ -1,17 +0,0 @@
-using Microsoft.AspNetCore.Mvc;
-using static CompServ.CompServConst;
-
-namespace CompServ.Hub
-{
-    [ApiController]
-    public class HubController : ControllerBase
-    {
-        [HttpGet]
-        [Route("/")]
-        public async Task<ActionResult> CheckAlive() => await Task.FromResult(Content(AliveMessageHub));
-
-
-
-
-    }
-}

+ 2 - 1
CompressService/CompServ.Shared/CompServConst.cs

@@ -11,11 +11,12 @@
 
         public const string RequestHeaderCompressLevel = "x-comp-serv-compress-level";
         public const string RequestHeaderCompressThreads = "x-comp-serv-compress-threads";
+        public const string RequestHeaderWorkerPort = "x-comp-serv-worker-port";
+        public const string RequestHeaderWorkerRatio = "x-comp-serv-worker-ratio";
 
         public const string ApiPathHubRootForCheckAlive = "/hub/";
         public const string ApiPathHubRegisterWorker = "/hub/register-worker";
         public const string ApiPathHubUnRegisterWorker = "/hub/un-register-worker";
         public const string ApiPathHubGetWorker = "/hub/get-worker";
-
     }
 }

+ 51 - 1
CompressService/CompServ.Shared/ModelExtensionMethod.cs

@@ -2,9 +2,11 @@
 using System.Collections.Generic;
 using System.Linq;
 using System.Net;
+using System.Reflection;
 using System.Text;
 using System.Threading.Tasks;
 using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Server.HttpSys;
 using static CompServ.CompServConst;
 
 namespace CompServ
@@ -39,7 +41,6 @@ namespace CompServ
                 {
                     {RequestHeaderCompressLevel,model.Level.ToString()},
                     {RequestHeaderCompressThreads,model.Threads.ToString()},
-
                 },
                 Version = HttpVersion.Version20,
                 VersionPolicy = HttpVersionPolicy.RequestVersionExact,
@@ -51,7 +52,56 @@ namespace CompServ
         {
             return new HttpRequestMessage(HttpMethod.Post, ApiPathWorkerDecompress)
             {
+                Version = HttpVersion.Version20,
+                VersionPolicy = HttpVersionPolicy.RequestVersionExact,
                 Content = new ByteArrayContent(model.DataToDecompress.Array!, model.DataToDecompress.Offset, model.DataToDecompress.Count),
+            };
+        }
+
+        public static HttpRequestMessage BuildWorkerRegisterRequestMessage(int port, int ratio)
+        {
+            return new HttpRequestMessage(HttpMethod.Post, ApiPathHubRegisterWorker)
+            {
+                Headers =
+                {
+                    { RequestHeaderWorkerPort, port.ToString() },
+                    { RequestHeaderWorkerRatio, ratio.ToString() }
+                },
+                Content = new ByteArrayContent(Array.Empty<byte>()),
+                Version = HttpVersion.Version20,
+                VersionPolicy = HttpVersionPolicy.RequestVersionExact,
+            };
+        }
+
+        public static HttpRequestMessage BuildWorkerUnRegisterRequestMessage(int port)
+        {
+            return new HttpRequestMessage(HttpMethod.Post, ApiPathHubUnRegisterWorker)
+            {
+                Headers = { { RequestHeaderWorkerPort, port.ToString() } },
+                Content = new ByteArrayContent(Array.Empty<byte>()),
+                Version = HttpVersion.Version20,
+                VersionPolicy = HttpVersionPolicy.RequestVersionExact,
+            };
+        }
+
+        public static (int? port, int? ratio) ExtractWorkerRegister(this HttpRequest request)
+        {
+            var p = int.TryParse(request.Headers[RequestHeaderWorkerPort], out var port)
+                ? (int?)port
+                : null;
+
+            var r = int.TryParse(request.Headers[RequestHeaderWorkerRatio], out var ratio)
+                ? (int?)ratio
+                : null;
+
+            return (p, r);
+        }
+
+        public static HttpRequestMessage BuildClientGetWorkerRequest()
+        {
+            return new HttpRequestMessage(HttpMethod.Get, ApiPathHubGetWorker)
+            {
+                Content = new ByteArrayContent(Array.Empty<byte>()),
                 Version = HttpVersion.Version20,
                 VersionPolicy = HttpVersionPolicy.RequestVersionExact,
             };

+ 4 - 2
CompressService/CompServ.Tests/Program.cs

@@ -5,7 +5,9 @@ using CompServ.ClientLibrary;
 
 Console.WriteLine("Hello, World!");
 
-var wClient = new CompServWorkerClient(args[0]);
+var hClient = new CompServHubClient(args[0]);
+var workerUrl = await hClient.GetWorker();
+var wClient = new CompServWorkerClient(workerUrl);
 
 Console.WriteLine("Checking alive...");
 var isAlive = await wClient.CheckAliveAsync();
@@ -20,7 +22,7 @@ if (isAlive)
 
     var compressedBytes = await wClient.CompressAsync(new CompressRequestModel { Level = 1, Threads = 1, DataToCompress = dataToCompress1 });
 
-    var decompress = await wClient.DecompressAsync(compressedBytes);
+    var decompress = await wClient.DecompressAsync(new DecompressRequestModel { DataToDecompress = compressedBytes });
 
     int bp = 0;
 }

+ 1 - 1
CompressService/CompServ.Tests/Properties/launchSettings.json

@@ -2,7 +2,7 @@
   "profiles": {
     "CompServ.Tests": {
       "commandName": "Project",
-      "commandLineArgs": "http://localhost:11076"
+      "commandLineArgs": "http://localhost:49700"
     }
   }
 }

+ 5 - 3
CompressService/CompServ.Worker/HubRegister.cs

@@ -16,11 +16,13 @@ internal class HubRegister(IServiceProvider serviceProvider, ILogger<HubRegister
         var saf = svr.Features.GetRequiredFeature<IServerAddressesFeature>();
         _listeningPort = new Uri(saf.Addresses.First()).Port;
 
-        logger.LogWarning($"TODO: Register to hub <{StatusHolder.HubServer}> with port <{_listeningPort}> and ratio <{StatusHolder.Ratio}>");
+        logger.LogInformation($"Register to hub <{StatusHolder.HubServer}> with port <{_listeningPort}> and ratio <{StatusHolder.Ratio}>");
+        await new ClientLibrary.CompServHubClient(StatusHolder.HubServer!).RegisterWorker(_listeningPort, StatusHolder.Ratio);
     }
 
     public async Task UnRegister()
     {
-        logger.LogWarning($"TODO: UnRegister to hub <{StatusHolder.HubServer}> with port <{_listeningPort}>");
+        logger.LogInformation($"UnRegister to hub <{StatusHolder.HubServer}> with port <{_listeningPort}>");
+        await new ClientLibrary.CompServHubClient(StatusHolder.HubServer!).UnRegisterWorker(_listeningPort);
     }
-}
+}

+ 1 - 1
CompressService/CompServ.Worker/Properties/launchSettings.json

@@ -2,7 +2,7 @@
   "profiles": {
     "CompServ.Worker": {
       "commandName": "Project",
-      "commandLineArgs": "http://192.68.5.33:7757 2"
+      "commandLineArgs": "http://127.0.0.1:49700 2"
     },
     "CompServ.Worker blank cli": {
       "commandName": "Project"      

+ 13 - 4
CompressService/CompServ.Worker/Program.cs

@@ -12,6 +12,7 @@ using CompServ.ClientLibrary;
 using Microsoft.AspNetCore.Hosting.Server.Features;
 using Microsoft.AspNetCore.Hosting.Server;
 using Microsoft.AspNetCore.Http.Features;
+using System.Net;
 
 if (args.Length > 0)
 {
@@ -54,8 +55,8 @@ builder.Services.Configure<LoggerFilterOptions>(options =>
 //配置HTTP1、HTTP2端口
 builder.WebHost.UseKestrel((_, kso) =>
 {
-    kso.ListenAnyIP(0, lo => lo.Protocols = HttpProtocols.Http2);
-    kso.ListenAnyIP(0, lo => lo.Protocols = HttpProtocols.Http1);
+    kso.Listen(new IPEndPoint(IPAddress.Any, 0), lo => lo.Protocols = HttpProtocols.Http2);
+    kso.Listen(new IPEndPoint(IPAddress.Any, 0), lo => lo.Protocols = HttpProtocols.Http1);
 });
 
 builder.Services.Configure<KestrelServerOptions>(options =>
@@ -100,9 +101,17 @@ await app.StartAsync();
     var svr = app.Services.GetRequiredService<IServer>();
     var saf = svr.Features.GetRequiredFeature<IServerAddressesFeature>();
     var listeningPort = new Uri(saf.Addresses.First()).Port;
-
-    var client = new CompServWorkerClient("http://localhost:" + listeningPort);
+    
+    tryAgain:
+    var client = new CompServWorkerClient("http://127.0.0.1:" + listeningPort);
     var r = await client.CheckAliveAsync();
+    if (r == false)
+    {
+        logger.LogWarning("Self check resunt: " + r + ", try again after 1 second");
+        await Task.Delay(1000);
+        goto tryAgain;
+    }
+
     logger.LogInformation("Self check resunt: " + r);
 }
 

+ 7 - 0
PictureMover/CoverForm.cs

@@ -70,6 +70,13 @@ namespace PictureMover
                     MoveToOuterDotSuffixDir(src, "s");
                     break;
                 }
+                case Keys.M:
+                {
+                    var src = _currentPath;
+                    await NavFile(true);
+                    MoveToOuterDotSuffixDir(src, "m");
+                    break;
+                }
             }
         }
 

+ 1 - 1
StrangeTools.sln

@@ -64,7 +64,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Older", "Older", "{84ACDDE9
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PictureMover", "PictureMover\PictureMover.csproj", "{C94C8E6E-1051-49AD-A539-E0BC8D365C59}"
 EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "A2024", "A2024", "{3120ADE6-C606-42F1-9AA8-B7F1A8933CD7}"
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "2024", "2024", "{3120ADE6-C606-42F1-9AA8-B7F1A8933CD7}"
 EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution