TrustedPeerManager.cs 11 KB


  1. using System.Collections.Concurrent;
  2. using System.Net.Sockets;
  3. using System.Security.Cryptography;
  4. using Microsoft.AspNetCore.Connections;
  5. using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
  6. using PCC.App.Networking;
  7. using PCC.App.Security;
  8. using PCC.App.TransferModels;
  9. using PCC.Common.EventBus;
  10. namespace PCC.App;
  11. // Handshake 1 → Public Key SHA256
  12. // Handshake 2 ← ACK for Yes or No(close connection)
  13. // *Complete Handshake
  14. // SendText 1 → Payload
  15. // SendText 2 ← ACK
  16. // ACK: SHA256(incoming payload)
  17. public abstract class TrustedPeerManager
  18. {
  19. private const int NONCE_LENGTH_BYTES = 16;
  20. private const int NONCE_EXPIRE_SECOND = 60;
  21. private const int NONCE_SKEW_SECOND = 30;
  22. public record TPM_EVT_CMD_INIT(byte[] MyPub, byte[] MyPri);
  23. public record TPM_EVT_CMD_SHUTDOWN;
  24. public record TPM_EVT_PEER_IX(string PeerId);
  25. public record TPM_EVT_PEER_CX(string PeerId);
  26. public record TPM_EVT_PEER_DX(string PeerId);
  27. public record TPM_EVT_PEER_RX(string PeerId, DateTimeOffset senderTimestamp, ReadOnlyMemory<byte> payload);
  28. public record TPM_EVT_PEER_TX(string PeerId, DateTimeOffset senderTimestamp, ReadOnlyMemory<byte> payload);
  29. public record TPM_EVT_PEER_XX(XX Kind, string? PeerId);
  30. public enum XX
  31. {
  32. Invalid = 0,
  33. NonTrustPeer,
  34. HandshakeFailServerAckNotMatch,
  35. TimeStampSkew,
  36. ReplayAttackDetected,
  37. }
  38. private readonly IEventBus _eventBus;
  39. private readonly ILogger<TrustedPeerManager> _logger;
  40. private readonly SocketConnectionContextFactory _connectionContextFactory;
  41. private RSA? _myPrivateKey;
  42. private ReadOnlyMemory<byte>? _myPeerId;
  43. private readonly ConcurrentDictionary<string, PeerInfo> _trustedPeers = new();
  44. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedPeers = new();
  45. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _IncomePeers = new();
  46. private readonly TimestampNonceManager _nonceManager;
  47. protected TrustedPeerManager(IEventBus eventBus, ILogger<TrustedPeerManager> logger)
  48. {
  49. _eventBus = eventBus;
  50. _logger = logger;
  51. _connectionContextFactory = new(new(), _logger);
  52. _nonceManager = new TimestampNonceManager(NONCE_LENGTH_BYTES - TimestampNonceManager.TimestampLength, TimeSpan.FromMicroseconds(NONCE_EXPIRE_SECOND), TimeSpan.FromSeconds(NONCE_SKEW_SECOND));
  53. eventBus.Subscript<TPM_EVT_CMD_INIT>(Init);
  54. eventBus.Subscript<TPM_EVT_CMD_SHUTDOWN>(Shutdown);
  55. }
  56. private void Init(TPM_EVT_CMD_INIT obj)
  57. {
  58. _myPrivateKey = RsaUtility.FromPKCS1PrivateKey(obj.MyPri);
  59. _myPeerId = SHA256.HashData(obj.MyPub);
  60. }
  61. private void Shutdown(TPM_EVT_CMD_SHUTDOWN _)
  62. {
  63. foreach (var item in _connectedPeers)
  64. {
  65. item.Value.Disconnect();
  66. _connectedPeers.Remove(item.Key, out var _);
  67. }
  68. foreach (var item in _IncomePeers)
  69. {
  70. item.Value.Disconnect();
  71. _connectedPeers.Remove(item.Key, out var _);
  72. }
  73. _nonceManager.Dispose();
  74. }
  75. public string AddPeer(byte[] publicKey, string host, int port)
  76. {
  77. var peerId = Convert.ToHexString(SHA256.HashData(publicKey));
  78. _trustedPeers[peerId] = new PeerInfo(RsaUtility.FromPKCS1PublicKey(publicKey), host, port);
  79. return peerId;
  80. }
  81. public bool RemovePeer(string peerId) => _trustedPeers.Remove(peerId, out _);
  82. public bool ConnectToPeerAsync(string peerId)
  83. {
  84. if (_trustedPeers.TryGetValue(peerId, out var peerInfo) == false)
  85. {
  86. return false;
  87. }
  88. if (_myPrivateKey == null || _myPeerId == null) throw new InvalidOperationException("Not init");
  89. _ = Task.Run(async () =>
  90. {
  91. EncryptedTcpPeer? epLocal = null;
  92. try
  93. {
  94. //Create Peer client and connect to server
  95. {
  96. var sckLocal = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  97. await sckLocal.ConnectAsync(peerInfo.Host, peerInfo.Port);
  98. var conn = _connectionContextFactory.Create(sckLocal);
  99. var tcpPeerLocal = new TcpPeer(conn);
  100. epLocal = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, tcpPeerLocal);
  101. // handshake as client
  102. // ☞ tx handshake 1
  103. var (handshake1, _) = _nonceManager.NewNonceWithPayload(_myPeerId.Value);
  104. await epLocal.SendBlockAsync(handshake1);
  105. // ☞ rx handshake 2
  106. var handshake2 = await epLocal.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  107. if (handshake2 == null) { epLocal.Disconnect(); return; } // timeout? disconnect
  108. var (ok, _, pl) = _nonceManager.CheckValidAndExtractPayload(handshake2);
  109. if (ok != true)
  110. {
  111. PublishXX(peerId, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid });
  112. epLocal.Disconnect();
  113. return;
  114. }
  115. // handshake not match, disconnect
  116. if (pl.Span.SequenceEqual(SHA256.HashData(handshake1.Span)) != true)
  117. {
  118. PublishXX(peerId, XX.HandshakeFailServerAckNotMatch);
  119. epLocal.Disconnect(); return;
  120. }
  121. }
  122. //replace exist connection
  123. if (_connectedPeers.Remove(peerId, out var exist)) exist.Disconnect();
  124. // register as connected
  125. _connectedPeers[peerId] = epLocal;
  126. _eventBus.Publish(new TPM_EVT_PEER_CX(peerId));
  127. while (true) // start RxCycle
  128. {
  129. var rxb = await epLocal.RxBlockAsync();
  130. if (rxb == null) break;
  131. var (ok, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(rxb);
  132. if (ok != true)
  133. {
  134. PublishXX(peerId, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid });
  135. epLocal.Disconnect();
  136. break;
  137. }
  138. _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, senderTimestamp, payload));
  139. }
  140. }
  141. catch (ConnectionAbortedException)
  142. {
  143. //ignore
  144. }
  145. catch (Exception ex)
  146. {
  147. _logger.LogError(ex, nameof(ConnectToPeerAsync));
  148. }
  149. _eventBus.Publish(new TPM_EVT_PEER_DX(peerId));
  150. // unregister from connected
  151. _connectedPeers.Remove(peerId, out _);
  152. epLocal?.Disconnect();
  153. });
  154. return true;
  155. }
  156. public async Task HandleIncomingPeerAsync(ConnectionContext connection)
  157. {
  158. if (_myPrivateKey == null) throw new InvalidOperationException("Not init");
  159. string? peerId = null;
  160. try
  161. {
  162. // handshake as server
  163. EncryptedTcpPeer epRemote;
  164. {
  165. var tpRemote = new TcpPeer(connection);
  166. //rx handshake1
  167. PeerInfo peerInfo;
  168. byte[] payloadOfHandshake1;
  169. {
  170. var receivedBlock = await tpRemote.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  171. if (receivedBlock == null) return;
  172. var handshake1 = RsaUtility.Decrypt(_myPrivateKey, receivedBlock);
  173. var (ok, _, payload) = _nonceManager.CheckValidAndExtractPayload(handshake1);
  174. if (ok != true)
  175. {
  176. PublishXX(null, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid });
  177. tpRemote.Disconnect();
  178. return;
  179. }
  180. peerId = Convert.ToHexString(payload.Span);
  181. // Kick if non-trusted
  182. if (_trustedPeers.TryGetValue(peerId, out peerInfo!) == false)
  183. {
  184. PublishXX(peerId, XX.NonTrustPeer);
  185. return;
  186. }
  187. // verify signature
  188. payloadOfHandshake1 = RsaUtility.DecryptAndVerifySignature(_myPrivateKey, receivedBlock, peerInfo.PublicKey);
  189. }
  190. //tx handshake2
  191. {
  192. epRemote = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, tpRemote);
  193. var payloadOfHandshake2 = SHA256.HashData(payloadOfHandshake1);
  194. var (handshake2, _) = _nonceManager.NewNonceWithPayload(payloadOfHandshake2);
  195. await epRemote.SendBlockAsync(handshake2);
  196. }
  197. }
  198. //replace exist connection
  199. if (_IncomePeers.Remove(peerId, out var exist)) exist.Disconnect();
  200. // register as income
  201. _IncomePeers[peerId] = epRemote;
  202. // start RxCycle
  203. while (true)
  204. {
  205. var receivedBlock = await epRemote.RxBlockAsync();
  206. if (receivedBlock == null) break;
  207. var (ok, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(receivedBlock);
  208. if (ok != true)
  209. {
  210. PublishXX(peerId, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid });
  211. break;
  212. }
  213. _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, senderTimestamp, payload));
  214. }
  215. }
  216. catch (ConnectionAbortedException)
  217. {
  218. //ignore
  219. }
  220. catch (Exception e)
  221. {
  222. _logger.LogError(e, nameof(HandleIncomingPeerAsync));
  223. }
  224. if (peerId != null)
  225. {
  226. _eventBus.Publish(new TPM_EVT_PEER_DX(peerId));
  227. // unregister from income
  228. if (_IncomePeers.Remove(peerId, out var peer))
  229. {
  230. peer.Disconnect();
  231. }
  232. }
  233. }
  234. private void PublishXX(string? peerId, XX kind) => _eventBus.Publish(new TPM_EVT_PEER_XX(kind, peerId));
  235. public bool SendToPeer(string peerId, byte[] payload)
  236. {
  237. if (_connectedPeers.TryGetValue(peerId, out var peerCon) == false && _IncomePeers.TryGetValue(peerId, out peerCon) == false) return false;
  238. Task.Run(async () =>
  239. {
  240. var (payloadAndNonce, timestamp) = _nonceManager.NewNonceWithPayload(payload);
  241. await peerCon.SendBlockAsync(payloadAndNonce);
  242. _eventBus.Publish(new TPM_EVT_PEER_TX(peerId, timestamp, payload));
  243. });
  244. return true;
  245. }
  246. private record PeerInfo(RSA PublicKey, string Host, int Port);
  247. }