LocalPeerManager.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. using System.Collections.Concurrent;
  2. using System.Net.Sockets;
  3. using System.Security.Cryptography;
  4. using Microsoft.AspNetCore.Connections;
  5. using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
  6. using PCC.App.Networking;
  7. using PCC.App.Security;
  8. using PCC.Common.EventBus;
  9. using PCC.Common.Networking;
  10. namespace PCC.App.Tpm;
  11. internal class LocalPeerManager
  12. {
  13. private readonly KestrelTcpServer _listener;
  14. private readonly SocketConnectionContextFactory _sckConnectionContextFactory;
  15. private readonly TimestampNonceManager _nonceManager;
  16. private readonly IEventBus _eventBus;
  17. private readonly ILocalPeerInfo _localPeer;
  18. private readonly ConcurrentDictionary<string, IRemotePeerInfo> _trustedRemotePeers;
  19. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedInbound = new();
  20. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedOutbound = new();
  21. public LocalPeerManager(SocketConnectionContextFactory sckConnectionContextFactory, TimestampNonceManager nonceManager, ILocalPeerInfo localPeer, IEventBus eventBus, ILogger<TrustedPeerManager> logger)
  22. {
  23. _sckConnectionContextFactory = sckConnectionContextFactory;
  24. _nonceManager = nonceManager;
  25. _localPeer = localPeer;
  26. _eventBus = eventBus;
  27. _listener = new(localPeer.Address, localPeer.Port, HandleInboundConnection, (ILogger<KestrelTcpServer>)logger);
  28. _trustedRemotePeers = new(localPeer.TrustedRemotePeers.ToDictionary(p => p.PeerId));
  29. }
  30. private async Task HandleInboundConnection(ConnectionContext connection)
  31. {
  32. _eventBus.Publish(new TPM_EVT_INBOUND_CON_ACCEPTED(_localPeer.PeerId, connection.ConnectionId, connection.RemoteEndPoint));
  33. string? remotePeerId = null;
  34. EncryptedTcpPeer? epRemote = null;
  35. try
  36. {
  37. // handshake as server
  38. {
  39. var tpRemote = new TcpPeer(connection);
  40. //rx handshake1
  41. IRemotePeerInfo peerPeerInfo;
  42. byte[] payloadOfHandshake1;
  43. {
  44. var receivedBlock = await tpRemote.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  45. if (receivedBlock == null) return;
  46. var handshake1 = RsaUtility.Decrypt(_localPeer.PrivateKey, receivedBlock);
  47. var (nonceResult, _, payload) = _nonceManager.CheckValidAndExtractPayload(handshake1);
  48. if (nonceResult != TimestampNonceResult.OK)
  49. {
  50. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  51. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, null));
  52. if (nonceResult == TimestampNonceResult.TimestampSkew)
  53. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, null));
  54. return;
  55. }
  56. remotePeerId = Convert.ToHexString(payload.Span);
  57. // Kick if non-trusted
  58. if (_trustedRemotePeers.TryGetValue(remotePeerId, out peerPeerInfo!) == false)
  59. {
  60. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_FAIL_NOT_TRUSTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  61. return;
  62. }
  63. // verify signature
  64. payloadOfHandshake1 = RsaUtility.DecryptAndVerifySignature(_localPeer.PrivateKey, receivedBlock, peerPeerInfo.PublicKey);
  65. }
  66. //tx handshake2
  67. {
  68. epRemote = new EncryptedTcpPeer(_localPeer.PrivateKey, peerPeerInfo.PublicKey, tpRemote);
  69. var payloadOfHandshake2 = SHA256.HashData(payloadOfHandshake1);
  70. var (handshake2, _) = _nonceManager.NewNonceWithPayload(payloadOfHandshake2);
  71. await epRemote.SendBlockAsync(handshake2);
  72. }
  73. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  74. }
  75. //replace exist connection
  76. if (_connectedInbound.Remove(remotePeerId, out var exist)) exist.Disconnect();
  77. _connectedInbound[remotePeerId] = epRemote;
  78. // start RxCycle
  79. while (true)
  80. {
  81. var receivedBlock = await epRemote.RxBlockAsync();
  82. if (receivedBlock == null) break;
  83. var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(receivedBlock);
  84. if (nonceResult != TimestampNonceResult.OK)
  85. {
  86. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  87. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  88. if (nonceResult == TimestampNonceResult.TimestampSkew)
  89. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  90. break;
  91. }
  92. _eventBus.Publish(new TPM_EVT_INBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload));
  93. }
  94. }
  95. catch (Exception exception)
  96. {
  97. _eventBus.Publish(new TPM_EVT_INBOUND_CON_ERROR(_localPeer.PeerId, connection.ConnectionId, remotePeerId, exception));
  98. }
  99. finally
  100. {
  101. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  102. epRemote?.Disconnect();
  103. if (remotePeerId != null) _connectedInbound.Remove(remotePeerId, out _);
  104. }
  105. }
  106. public bool ConnectToRemotePeer(string remotePeerId)
  107. {
  108. if (_trustedRemotePeers.TryGetValue(remotePeerId, out var peerInfo) == false) return false;
  109. _ = Task.Run(async () =>
  110. {
  111. EncryptedTcpPeer? epOutbound = null;
  112. try
  113. {
  114. //Create Peer client and connect to server
  115. {
  116. var sckOutbound = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  117. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.CONNECTION_ATTEMPT, _localPeer.PeerId, remotePeerId));
  118. await sckOutbound.ConnectAsync(peerInfo.Host, peerInfo.Port);
  119. var connOutbound = _sckConnectionContextFactory.Create(sckOutbound);
  120. var tcpPeerOutbound = new TcpPeer(connOutbound);
  121. epOutbound = new EncryptedTcpPeer(_localPeer.PrivateKey, peerInfo.PublicKey, tcpPeerOutbound);
  122. // handshake as client
  123. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_ATTEMPT, _localPeer.PeerId, remotePeerId));
  124. // ☞ tx handshake 1
  125. var (handshake1, _) = _nonceManager.NewNonceWithPayload(Convert.FromHexString(_localPeer.PeerId));
  126. await epOutbound.SendBlockAsync(handshake1);
  127. // ☞ rx handshake 2
  128. var handshake2 = await epOutbound.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  129. if (handshake2 == null)
  130. {
  131. return;
  132. } // timeout or disconnect
  133. var (nonceResult, _, pl) = _nonceManager.CheckValidAndExtractPayload(handshake2);
  134. if (nonceResult != TimestampNonceResult.OK)
  135. {
  136. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  137. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId));
  138. if (nonceResult == TimestampNonceResult.TimestampSkew)
  139. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId));
  140. epOutbound.Disconnect();
  141. return;
  142. }
  143. // handshake not match, disconnect
  144. if (pl.Span.SequenceEqual(SHA256.HashData(handshake1.Span)) != true)
  145. {
  146. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_FAIL_ACK_NOT_MATCHED, _localPeer.PeerId, remotePeerId));
  147. return;
  148. }
  149. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, remotePeerId));
  150. }
  151. //replace exist connection
  152. if (_connectedOutbound.Remove(remotePeerId, out var exist)) exist.Disconnect();
  153. _connectedOutbound[remotePeerId] = epOutbound;
  154. while (true) // start RxCycle
  155. {
  156. var rxb = await epOutbound.RxBlockAsync();
  157. if (rxb == null) break;
  158. var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(rxb);
  159. if (nonceResult != TimestampNonceResult.OK)
  160. {
  161. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  162. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId));
  163. if (nonceResult == TimestampNonceResult.TimestampSkew)
  164. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId));
  165. break;
  166. }
  167. _eventBus.Publish(new TPM_EVT_OUTBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload));
  168. }
  169. }
  170. catch (Exception ex)
  171. {
  172. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_ERROR(_localPeer.PeerId, remotePeerId, ex));
  173. }
  174. finally
  175. {
  176. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, remotePeerId));
  177. _connectedOutbound.Remove(remotePeerId, out _);
  178. epOutbound?.Disconnect();
  179. }
  180. });
  181. return true;
  182. }
  183. public bool DisconnectRemotePeer(string remotePeerId)
  184. {
  185. var flag = false;
  186. if (_connectedInbound.Remove(remotePeerId, out var inbound))
  187. {
  188. flag = true;
  189. inbound.Disconnect();
  190. }
  191. if (_connectedOutbound.Remove(remotePeerId, out var outbound))
  192. {
  193. flag = true;
  194. outbound.Disconnect();
  195. }
  196. return flag;
  197. }
  198. public void AddOrReplaceRemotePeer(IRemotePeerInfo peerInfo)
  199. {
  200. _trustedRemotePeers[peerInfo.PeerId] = peerInfo;
  201. }
  202. public bool RemoveRemotePeer(string remotePeerId)
  203. {
  204. DisconnectRemotePeer(remotePeerId);
  205. return _trustedRemotePeers.Remove(remotePeerId, out _);
  206. }
  207. public void Start()
  208. {
  209. Task.Run(async () =>
  210. {
  211. try
  212. {
  213. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STARTING));
  214. await _listener.StartAsync();
  215. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STARTED));
  216. }
  217. catch (Exception ex)
  218. {
  219. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.START_FAIL, ex));
  220. }
  221. });
  222. }
  223. public void Stop()
  224. {
  225. Task.Run(async () =>
  226. {
  227. try
  228. {
  229. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STOPPING));
  230. await _listener.StopAsync();
  231. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STOPPED));
  232. }
  233. catch (Exception ex)
  234. {
  235. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STOP_FAIL, ex));
  236. }
  237. });
  238. foreach (var item in _connectedInbound) item.Value.Disconnect();
  239. _connectedInbound.Clear();
  240. foreach (var item in _connectedOutbound) item.Value.Disconnect();
  241. _connectedOutbound.Clear();
  242. }
  243. public bool SendToRemotePeer(string remotePeerId, byte[] payload)
  244. {
  245. bool isOutbound;
  246. if (_connectedOutbound.TryGetValue(remotePeerId, out var remote) == false)
  247. {
  248. if (_connectedInbound.TryGetValue(remotePeerId, out remote))
  249. isOutbound = false;
  250. else
  251. return false;
  252. }
  253. else
  254. {
  255. isOutbound = true;
  256. }
  257. Task.Run(async () =>
  258. {
  259. var (payloadAndNonce, timestamp) = _nonceManager.NewNonceWithPayload(payload);
  260. await remote.SendBlockAsync(payloadAndNonce);
  261. if (isOutbound)
  262. {
  263. _eventBus.Publish(new TPM_EVT_OUTBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload));
  264. }
  265. else
  266. {
  267. _eventBus.Publish(new TPM_EVT_INBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload));
  268. }
  269. });
  270. return true;
  271. }
  272. }