LocalPeerManager.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. using System.Collections.Concurrent;
  2. using System.Net.Sockets;
  3. using System.Security.Cryptography;
  4. using Microsoft.AspNetCore.Connections;
  5. using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
  6. using PCC.App.Networking;
  7. using PCC.App.Security;
  8. using PCC.Common.EventBus;
  9. using PCC.Common.Networking;
  10. namespace PCC.App.Tpm;
  11. internal class LocalPeerManager
  12. {
  13. private readonly KestrelTcpServer _listener;
  14. private readonly SocketConnectionContextFactory _sckConnectionContextFactory;
  15. private readonly TimestampNonceManager _nonceManager;
  16. private readonly IEventBus _eventBus;
  17. private readonly ILocalPeerInfo _localPeer;
  18. private readonly ConcurrentDictionary<string, IRemotePeerInfo> _trustedRemotePeers;
  19. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedInbound = new();
  20. private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedOutbound = new();
  21. public LocalPeerManager(SocketConnectionContextFactory sckConnectionContextFactory, TimestampNonceManager nonceManager, ILocalPeerInfo localPeer, IEventBus eventBus, ILogger<TrustedPeerManager> logger, ILogger<KestrelTcpServer> ktsLogger)
  22. {
  23. _sckConnectionContextFactory = sckConnectionContextFactory;
  24. _nonceManager = nonceManager;
  25. _localPeer = localPeer;
  26. _eventBus = eventBus;
  27. _listener = new(localPeer.Address, localPeer.Port, HandleInboundConnection, ktsLogger);
  28. _trustedRemotePeers = new(localPeer.TrustedRemotePeers.ToDictionary(p => p.PeerId));
  29. }
  30. private async Task HandleInboundConnection(ConnectionContext connection)
  31. {
  32. _eventBus.Publish(new TPM_EVT_INBOUND_CON_ACCEPTED(_localPeer.PeerId, connection.ConnectionId, connection.RemoteEndPoint));
  33. string? remotePeerId = null;
  34. EncryptedTcpPeer? epRemote = null;
  35. try
  36. {
  37. // handshake as server
  38. {
  39. var tpRemote = new TcpPeer(connection);
  40. //rx handshake1
  41. IRemotePeerInfo peerPeerInfo;
  42. byte[] payloadOfHandshake1;
  43. {
  44. var receivedBlock = await tpRemote.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  45. if (receivedBlock == null) return;
  46. var handshake1 = RsaUtility.Decrypt(_localPeer.PrivateKey, receivedBlock);
  47. var (nonceResult, _, payload) = _nonceManager.CheckValidAndExtractPayload(handshake1);
  48. if (nonceResult != TimestampNonceResult.OK)
  49. {
  50. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  51. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, null));
  52. if (nonceResult == TimestampNonceResult.TimestampSkew)
  53. {
  54. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, null));
  55. }
  56. return;
  57. }
  58. remotePeerId = Convert.ToHexString(payload.Span);
  59. // Kick if non-trusted
  60. if (_trustedRemotePeers.TryGetValue(remotePeerId, out peerPeerInfo!) == false)
  61. {
  62. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_FAIL_NOT_TRUSTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  63. return;
  64. }
  65. // verify signature
  66. payloadOfHandshake1 = RsaUtility.DecryptAndVerifySignature(_localPeer.PrivateKey, receivedBlock, peerPeerInfo.PublicKey);
  67. }
  68. //tx handshake2
  69. {
  70. epRemote = new EncryptedTcpPeer(_localPeer.PrivateKey, peerPeerInfo.PublicKey, tpRemote);
  71. var payloadOfHandshake2 = SHA256.HashData(payloadOfHandshake1);
  72. var (handshake2, _) = _nonceManager.NewNonceWithPayload(payloadOfHandshake2);
  73. await epRemote.SendBlockAsync(handshake2);
  74. }
  75. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  76. }
  77. //replace exist connection
  78. if (_connectedInbound.Remove(remotePeerId, out var exist)) exist.Disconnect();
  79. _connectedInbound[remotePeerId] = epRemote;
  80. // start RxCycle
  81. while (true)
  82. {
  83. var receivedBlock = await epRemote.RxBlockAsync();
  84. if (receivedBlock == null) break;
  85. var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(receivedBlock);
  86. if (nonceResult != TimestampNonceResult.OK)
  87. {
  88. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  89. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  90. if (nonceResult == TimestampNonceResult.TimestampSkew)
  91. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  92. break;
  93. }
  94. _eventBus.Publish(new TPM_EVT_INBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload));
  95. }
  96. }
  97. catch (Exception exception)
  98. {
  99. _eventBus.Publish(new TPM_EVT_INBOUND_CON_ERROR(_localPeer.PeerId, connection.ConnectionId, remotePeerId, exception));
  100. }
  101. finally
  102. {
  103. _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
  104. epRemote?.Disconnect();
  105. if (remotePeerId != null) _connectedInbound.Remove(remotePeerId, out _);
  106. }
  107. }
  108. public bool ConnectToRemotePeer(string remotePeerId)
  109. {
  110. if (_trustedRemotePeers.TryGetValue(remotePeerId, out var peerInfo) == false) return false;
  111. _ = Task.Run(async () =>
  112. {
  113. EncryptedTcpPeer? epOutbound = null;
  114. try
  115. {
  116. //Create Peer client and connect to server
  117. {
  118. var sckOutbound = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  119. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.CONNECTION_ATTEMPT, _localPeer.PeerId, remotePeerId));
  120. await sckOutbound.ConnectAsync(peerInfo.Host, peerInfo.Port);
  121. var connOutbound = _sckConnectionContextFactory.Create(sckOutbound);
  122. var tcpPeerOutbound = new TcpPeer(connOutbound);
  123. epOutbound = new EncryptedTcpPeer(_localPeer.PrivateKey, peerInfo.PublicKey, tcpPeerOutbound);
  124. // handshake as client
  125. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_ATTEMPT, _localPeer.PeerId, remotePeerId));
  126. // ☞ tx handshake 1
  127. var (handshake1, _) = _nonceManager.NewNonceWithPayload(Convert.FromHexString(_localPeer.PeerId));
  128. await epOutbound.SendBlockAsync(handshake1);
  129. // ☞ rx handshake 2
  130. var handshake2 = await epOutbound.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
  131. if (handshake2 == null)
  132. {
  133. return;
  134. } // timeout or disconnect
  135. var (nonceResult, _, pl) = _nonceManager.CheckValidAndExtractPayload(handshake2);
  136. if (nonceResult != TimestampNonceResult.OK)
  137. {
  138. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  139. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId));
  140. if (nonceResult == TimestampNonceResult.TimestampSkew)
  141. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId));
  142. epOutbound.Disconnect();
  143. return;
  144. }
  145. // handshake not match, disconnect
  146. if (pl.Span.SequenceEqual(SHA256.HashData(handshake1.Span)) != true)
  147. {
  148. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_FAIL_ACK_NOT_MATCHED, _localPeer.PeerId, remotePeerId));
  149. return;
  150. }
  151. }
  152. //replace exist connection
  153. if (_connectedOutbound.Remove(remotePeerId, out var exist)) exist.Disconnect();
  154. _connectedOutbound[remotePeerId] = epOutbound;
  155. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, remotePeerId));
  156. while (true) // start RxCycle
  157. {
  158. var rxb = await epOutbound.RxBlockAsync();
  159. if (rxb == null) break;
  160. var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(rxb);
  161. if (nonceResult != TimestampNonceResult.OK)
  162. {
  163. if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
  164. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId));
  165. if (nonceResult == TimestampNonceResult.TimestampSkew)
  166. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId));
  167. break;
  168. }
  169. _eventBus.Publish(new TPM_EVT_OUTBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload));
  170. }
  171. }
  172. catch (Exception ex)
  173. {
  174. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_ERROR(_localPeer.PeerId, remotePeerId, ex));
  175. }
  176. finally
  177. {
  178. _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, remotePeerId));
  179. _connectedOutbound.Remove(remotePeerId, out _);
  180. epOutbound?.Disconnect();
  181. }
  182. });
  183. return true;
  184. }
  185. public bool DisconnectRemotePeer(string remotePeerId)
  186. {
  187. var flag = false;
  188. if (_connectedInbound.Remove(remotePeerId, out var inbound))
  189. {
  190. flag = true;
  191. inbound.Disconnect();
  192. }
  193. if (_connectedOutbound.Remove(remotePeerId, out var outbound))
  194. {
  195. flag = true;
  196. outbound.Disconnect();
  197. }
  198. return flag;
  199. }
  200. public void AddOrReplaceRemotePeer(IRemotePeerInfo peerInfo)
  201. {
  202. RemoveRemotePeer(peerInfo.PeerId);
  203. _trustedRemotePeers[peerInfo.PeerId] = peerInfo;
  204. }
  205. public bool RemoveRemotePeer(string remotePeerId)
  206. {
  207. DisconnectRemotePeer(remotePeerId);
  208. return _trustedRemotePeers.Remove(remotePeerId, out _);
  209. }
  210. public void Start()
  211. {
  212. Task.Run(async () =>
  213. {
  214. try
  215. {
  216. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STARTING));
  217. await _listener.StartAsync();
  218. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STARTED));
  219. }
  220. catch (Exception ex)
  221. {
  222. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.START_FAIL, ex));
  223. }
  224. });
  225. }
  226. public void Stop()
  227. {
  228. Task.Run(async () =>
  229. {
  230. try
  231. {
  232. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STOPPING));
  233. await _listener.StopAsync();
  234. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STOPPED));
  235. }
  236. catch (Exception ex)
  237. {
  238. _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STOP_FAIL, ex));
  239. }
  240. });
  241. foreach (var item in _connectedInbound) item.Value.Disconnect();
  242. _connectedInbound.Clear();
  243. foreach (var item in _connectedOutbound) item.Value.Disconnect();
  244. _connectedOutbound.Clear();
  245. }
  246. public bool SendToRemotePeer(string remotePeerId, byte[] payload)
  247. {
  248. bool isOutbound;
  249. if (_connectedOutbound.TryGetValue(remotePeerId, out var remote) == false)
  250. {
  251. if (_connectedInbound.TryGetValue(remotePeerId, out remote))
  252. isOutbound = false;
  253. else
  254. return false;
  255. }
  256. else
  257. {
  258. isOutbound = true;
  259. }
  260. Task.Run(async () =>
  261. {
  262. var (payloadAndNonce, timestamp) = _nonceManager.NewNonceWithPayload(payload);
  263. await remote.SendBlockAsync(payloadAndNonce);
  264. if (isOutbound)
  265. {
  266. _eventBus.Publish(new TPM_EVT_OUTBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload));
  267. }
  268. else
  269. {
  270. _eventBus.Publish(new TPM_EVT_INBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload));
  271. }
  272. });
  273. return true;
  274. }
  275. }