Browse Source

Import: PubSub and Service

HOME 3 years ago
parent
commit
b555213349

+ 22 - 0
VCommon.PubSub/PubSubBase.cs

@@ -0,0 +1,22 @@
+using StackExchange.Redis;
+using System;
+
+namespace VCommon.PubSub
+{
+    public class PubSubBase : IDisposable
+    {
+        protected readonly string ChannelName;
+        internal readonly ConnectionMultiplexer Conn;
+
+        public PubSubBase(string server, string channelName)
+        {
+            ChannelName = channelName;
+            Conn = ConnectionMultiplexer.Connect(server);
+        }
+
+        public virtual void Dispose()
+        {
+            Conn?.Dispose();
+        }
+    }
+}

+ 17 - 0
VCommon.PubSub/Publisher.cs

@@ -0,0 +1,17 @@
+using VCommon.Logging;
+
+namespace VCommon.PubSub
+{
+    public class Publisher : PubSubBase
+    {
+        public Publisher(string server, string channelName) : base(server, channelName)
+        {
+        }
+
+        protected void Publish(string message)
+        {
+            Logger.Trace($"{nameof(PubSub)}:{nameof(Publish)}", new { ChannelName, message });
+            Conn.GetSubscriber().Publish(ChannelName, message);
+        }
+    }
+}

+ 47 - 0
VCommon.PubSub/Subscriber.cs

@@ -0,0 +1,47 @@
+using StackExchange.Redis;
+using System;
+using VCommon.Logging;
+using VCommon.Services;
+
+namespace VCommon.PubSub
+{
+    public abstract class Subscriber : PubSubBase, IService
+    {
+        protected Subscriber(string server, string channelName) : base(server, channelName)
+        {
+        }
+
+        private void OnMessageArrival(RedisChannel channel, RedisValue content)
+        {
+            Logger.Trace($"{nameof(PubSub)}:{nameof(Subscriber)} Event", new { ChannelName, Content = (string)content });
+            try
+            {
+                HandleEvent(content);
+            }
+            catch (Exception exception)
+            {
+                Logger.Error($"{nameof(PubSub)}:{nameof(Subscriber)} Error on handling event", new { ChannelName, exception });
+            }
+        }
+
+        public override void Dispose()
+        {
+            Stop();
+            base.Dispose();
+        }
+
+        protected abstract void HandleEvent(string message);
+
+        public void Start()
+        {
+            Logger.Trace($"{nameof(PubSub)}:{nameof(Subscriber)} Start", ChannelName);
+            Conn.GetSubscriber().Subscribe(ChannelName, OnMessageArrival);
+        }
+
+        public void Stop()
+        {
+            Logger.Trace($"{nameof(PubSub)}:{nameof(Subscriber)} Stop", ChannelName);
+            Conn.GetSubscriber().Unsubscribe(ChannelName, OnMessageArrival);
+        }
+    }
+}

+ 17 - 0
VCommon.PubSub/VCommon.PubSub.csproj

@@ -0,0 +1,17 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>net5.0</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="StackExchange.Redis" Version="2.2.62" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\VCommon.Json\VCommon.Json.csproj" />
+    <ProjectReference Include="..\VCommon.Services\VCommon.Services.csproj" />
+    <ProjectReference Include="..\VCommon\VCommon.csproj" />
+  </ItemGroup>
+
+</Project>

+ 47 - 0
VCommon.Services/CycleService.cs

@@ -0,0 +1,47 @@
+using System;
+using System.Threading;
+using VCommon.Logging;
+
+namespace VCommon.Services
+{
+    public abstract class CycleService : IService
+    {
+        private Thread _thread;
+        private bool _isRunning;
+
+        protected int SecLoopInterval = 30;
+        protected int MsWaitStop = 5000;
+
+        private void CycleRun()
+        {
+            while (_isRunning)
+            {
+                try
+                {
+                    Run();
+                }
+                catch (Exception e)
+                {
+                    Logger.Error("Exception in CycleRun", e);
+                }
+                for (var i = 0; i < SecLoopInterval && _isRunning; i++) Thread.Sleep(1000);
+            }
+        }
+
+        protected abstract void Run();
+
+        public void Start()
+        {
+            _isRunning = true;
+            _thread = new Thread(CycleRun) { Name = GetType().Name };
+            _thread.Start();
+        }
+
+        public void Stop()
+        {
+            _isRunning = false;
+            _thread.Join(MsWaitStop);
+            _thread.Abort();
+        }
+    }
+}

+ 9 - 0
VCommon.Services/IService.cs

@@ -0,0 +1,9 @@
+namespace VCommon.Services
+{
+    public interface IService
+    {
+        void Start();
+
+        void Stop();
+    }
+}

+ 50 - 0
VCommon.Services/ThreadedServiceBase.cs

@@ -0,0 +1,50 @@
+using System.Threading;
+
+namespace VCommon.Services
+{
+    public abstract class ThreadedServiceBase : IService
+    {
+        private Thread _innerThread;
+
+        public string Tag { get; }
+        public bool IsBackground { get; }
+
+        protected ThreadedServiceBase(string tag = null, bool isBackground = true)
+        {
+            Tag = tag;
+            IsBackground = isBackground;
+        }
+
+        public void Start()
+        {
+            if (null != _innerThread) return;
+
+            OnStart();
+
+            _innerThread = new Thread(Run) { IsBackground = IsBackground };
+            if (null != Tag) _innerThread.Name = Tag;
+            _innerThread.Start();
+        }
+
+        protected void Join(int ms) => _innerThread.Join(ms);
+
+        public void Stop()
+        {
+            if (null == _innerThread) return;
+            OnStop();
+
+            _innerThread.Abort();
+            _innerThread = null;
+        }
+
+        protected virtual void OnStop()
+        {
+        }
+
+        protected virtual void OnStart()
+        {
+        }
+
+        protected abstract void Run();
+    }
+}

+ 11 - 0
VCommon.Services/VCommon.Services.csproj

@@ -0,0 +1,11 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>net5.0</TargetFramework>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\VCommon\VCommon.csproj" />
+  </ItemGroup>
+
+</Project>

+ 12 - 0
VCommonCore.sln

@@ -46,6 +46,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "@", "@", "{FEA6F92D-28D4-4C
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VCommon.Caching", "VCommon.Caching\VCommon.Caching.csproj", "{3FA626FD-0A30-42EE-A45F-1923544B5042}"
 EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VCommon.PubSub", "VCommon.PubSub\VCommon.PubSub.csproj", "{D6020B9A-955C-4F05-92B6-CD43F1635871}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VCommon.Services", "VCommon.Services\VCommon.Services.csproj", "{2754DB53-E475-484B-B803-BD33E5E30C13}"
+EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
 		Debug|Any CPU = Debug|Any CPU
@@ -124,6 +128,14 @@ Global
 		{3FA626FD-0A30-42EE-A45F-1923544B5042}.Debug|Any CPU.Build.0 = Debug|Any CPU
 		{3FA626FD-0A30-42EE-A45F-1923544B5042}.Release|Any CPU.ActiveCfg = Release|Any CPU
 		{3FA626FD-0A30-42EE-A45F-1923544B5042}.Release|Any CPU.Build.0 = Release|Any CPU
+		{D6020B9A-955C-4F05-92B6-CD43F1635871}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{D6020B9A-955C-4F05-92B6-CD43F1635871}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{D6020B9A-955C-4F05-92B6-CD43F1635871}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{D6020B9A-955C-4F05-92B6-CD43F1635871}.Release|Any CPU.Build.0 = Release|Any CPU
+		{2754DB53-E475-484B-B803-BD33E5E30C13}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{2754DB53-E475-484B-B803-BD33E5E30C13}.Debug|Any CPU.Build.0 = Debug|Any CPU
+		{2754DB53-E475-484B-B803-BD33E5E30C13}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{2754DB53-E475-484B-B803-BD33E5E30C13}.Release|Any CPU.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(SolutionProperties) = preSolution
 		HideSolutionNode = FALSE