TrustedPeerManager.cs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. using System.Collections.Concurrent;
  2. using System.Net.Sockets;
  3. using System.Security.Cryptography;
  4. using Microsoft.AspNetCore.Connections;
  5. using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
  6. using PCC.App.Networking;
  7. using PCC.App.Security;
  8. using PCC.Common.EventBus;
  9. namespace PCC.App;
  10. // Handshake 1 → Public Key SHA256
  11. // Handshake 2 ← ACK for Yes or No(close connection)
  12. // *Complete Handshake
  13. // SendText 1 → Payload
  14. // SendText 2 ← ACK
  15. // ACK: SHA256(incoming payload)
  16. public abstract class TrustedPeerManager
  17. {
  18. public record TPM_EVT_CMD_INIT(byte[] MyPub, byte[] MyPri);
  19. public record TPM_EVT_CMD_SHUTDOWN;
  20. public record TPM_EVT_PEER_CX(string PeerId);
  21. public record TPM_EVT_PEER_IX(string PeerId);
  22. public record TPM_EVT_PEER_DX(string PeerId);
  23. public record TPM_EVT_PEER_RX(string PeerId, byte[] block);
  24. private readonly IEventBus _eventBus;
  25. private readonly ILogger<TrustedPeerManager> _logger;
  26. private readonly SocketConnectionContextFactory _connectionContextFactory;
  27. private RSA _myPrivateKey;
  28. private ReadOnlyMemory<byte> _myPeerId;
  29. private readonly ConcurrentDictionary<string, PeerInfo> _trustedPeers = new();
  30. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedPeers = new();
  31. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _IncomePeers = new();
  32. protected TrustedPeerManager(IEventBus eventBus, ILogger<TrustedPeerManager> logger)
  33. {
  34. _eventBus = eventBus;
  35. _logger = logger;
  36. _connectionContextFactory = new(new(), _logger);
  37. eventBus.Subscript<TPM_EVT_CMD_INIT>(Init);
  38. eventBus.Subscript<TPM_EVT_CMD_SHUTDOWN>(Shutdown);
  39. }
  40. private void Init(TPM_EVT_CMD_INIT obj)
  41. {
  42. _myPrivateKey = RsaUtility.FromPKCS1PrivateKey(obj.MyPri);
  43. _myPeerId = SHA256.HashData(obj.MyPub);
  44. }
  45. private void Shutdown(TPM_EVT_CMD_SHUTDOWN _)
  46. {
  47. foreach (var item in _connectedPeers.Values)
  48. {
  49. item.Disconnect();
  50. }
  51. foreach (var item in _IncomePeers.Values)
  52. {
  53. item.Disconnect();
  54. }
  55. }
  56. public string AddTrustPeer(byte[] tpk, string host, int port)
  57. {
  58. var peerId = Convert.ToHexString(SHA256.HashData(tpk));
  59. var rsa = RsaUtility.FromPKCS1PublicKey(tpk);
  60. _trustedPeers[peerId] = new PeerInfo(rsa, host, port);
  61. return peerId;
  62. }
  63. public bool ConnectToPeerAsync(string peerId)
  64. {
  65. if (_trustedPeers.TryGetValue(peerId, out var peerInfo) == false)
  66. {
  67. return false;
  68. }
  69. _ = Task.Run(async () =>
  70. {
  71. try
  72. {
  73. //Start Peer client
  74. // connect to server
  75. using var sck = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  76. await sck.ConnectAsync(peerInfo.host, peerInfo.port);
  77. var conn = _connectionContextFactory.Create(sck);
  78. var peer = new TcpPeer(conn, sck);
  79. // handshake as client
  80. var ep = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, peer);
  81. await ep.SendBlock(_myPeerId); //send handshake 1
  82. var blockAck = await ep.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  83. if (blockAck?.SequenceEqual(SHA256.HashData(_myPeerId.Span)) != true)
  84. {
  85. peer.Disconnect();
  86. return;
  87. }
  88. // register as connected
  89. //TODO: replace exist connection
  90. _connectedPeers[peerId] = ep;
  91. _eventBus.Publish(new TPM_EVT_PEER_CX(peerId));
  92. while (true) // start RxCycle
  93. {
  94. var rxb = await ep.RxBlockAsync();
  95. if (rxb == null) break;
  96. _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, rxb));
  97. }
  98. }
  99. catch (Exception ex)
  100. {
  101. _logger.LogError(ex, nameof(ConnectToPeerAsync));
  102. }
  103. _eventBus.Publish(new TPM_EVT_PEER_DX(peerId));
  104. });
  105. return true;
  106. }
  107. public async Task HandleIncomingPeerAsync(ConnectionContext connection)
  108. {
  109. string? peerId = null;
  110. try
  111. {
  112. var peer = new TcpPeer(connection, null);
  113. // handshake as server
  114. var eb = await peer.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  115. if (eb == null) return;
  116. // extract peerId
  117. peerId = Convert.ToHexString(RsaUtility.Decrypt(_myPrivateKey, eb));
  118. // Kick if non trusted
  119. if (_trustedPeers.TryGetValue(peerId, out var peerInfo) == false) return;
  120. // verify signature
  121. var data = RsaUtility.DecryptAndVerifySignature(_myPrivateKey, eb, peerInfo.PublicKey);
  122. var ep = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, peer);
  123. await ep.SendBlock(SHA256.HashData(data)); //ACK
  124. // register as income
  125. //TODO: replace exist connection
  126. _IncomePeers[peerId] = ep;
  127. // start RxCycle
  128. while (true) // start RxCycle
  129. {
  130. var rxb = await ep.RxBlockAsync();
  131. if (rxb == null) break;
  132. _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, rxb));
  133. }
  134. }
  135. catch (Exception e)
  136. {
  137. _logger.LogError(e, nameof(HandleIncomingPeerAsync));
  138. }
  139. if (peerId != null) _eventBus.Publish(new TPM_EVT_PEER_DX(peerId));
  140. }
  141. public bool SendToPeer(string peerId, byte[] block)
  142. {
  143. if (_connectedPeers.TryGetValue(peerId, out var peerCon) == false && _IncomePeers.TryGetValue(peerId, out peerCon) == false) return false;
  144. _ = peerCon.SendBlock(block);
  145. return true;
  146. }
  147. private record PeerInfo(RSA PublicKey, string host, int port);
  148. }