using System.Collections.Concurrent; using System.Net.Sockets; using System.Security.Cryptography; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; using PCC.App.Networking; using PCC.App.Security; using PCC.Common.EventBus; using PCC.Common.Networking; namespace PCC.App.Tpm; internal class LocalPeerManager { private readonly KestrelTcpServer _listener; private readonly SocketConnectionContextFactory _sckConnectionContextFactory; private readonly TimestampNonceManager _nonceManager; private readonly IEventBus _eventBus; private readonly ILocalPeerInfo _localPeer; private readonly ConcurrentDictionary _trustedRemotePeers; private readonly ConcurrentDictionary _connectedInbound = new(); private readonly ConcurrentDictionary _connectedOutbound = new(); public LocalPeerManager(SocketConnectionContextFactory sckConnectionContextFactory, TimestampNonceManager nonceManager, ILocalPeerInfo localPeer, IEventBus eventBus, ILogger logger) { _sckConnectionContextFactory = sckConnectionContextFactory; _nonceManager = nonceManager; _localPeer = localPeer; _eventBus = eventBus; _listener = new(localPeer.Address, localPeer.Port, HandleInboundConnection, (ILogger)logger); _trustedRemotePeers = new(localPeer.TrustedRemotePeers.ToDictionary(p => p.PeerId)); } private async Task HandleInboundConnection(ConnectionContext connection) { _eventBus.Publish(new TPM_EVT_INBOUND_CON_ACCEPTED(_localPeer.PeerId, connection.ConnectionId, connection.RemoteEndPoint)); string? remotePeerId = null; EncryptedTcpPeer? epRemote = null; try { // handshake as server { var tpRemote = new TcpPeer(connection); //rx handshake1 IRemotePeerInfo peerPeerInfo; byte[] payloadOfHandshake1; { var receivedBlock = await tpRemote.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); if (receivedBlock == null) return; var handshake1 = RsaUtility.Decrypt(_localPeer.PrivateKey, receivedBlock); var (nonceResult, _, payload) = _nonceManager.CheckValidAndExtractPayload(handshake1); if (nonceResult != TimestampNonceResult.OK) { if (nonceResult == TimestampNonceResult.ReplayAttackDetected) _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, null)); if (nonceResult == TimestampNonceResult.TimestampSkew) _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, null)); return; } remotePeerId = Convert.ToHexString(payload.Span); // Kick if non-trusted if (_trustedRemotePeers.TryGetValue(remotePeerId, out peerPeerInfo!) == false) { _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_FAIL_NOT_TRUSTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId)); return; } // verify signature payloadOfHandshake1 = RsaUtility.DecryptAndVerifySignature(_localPeer.PrivateKey, receivedBlock, peerPeerInfo.PublicKey); } //tx handshake2 { epRemote = new EncryptedTcpPeer(_localPeer.PrivateKey, peerPeerInfo.PublicKey, tpRemote); var payloadOfHandshake2 = SHA256.HashData(payloadOfHandshake1); var (handshake2, _) = _nonceManager.NewNonceWithPayload(payloadOfHandshake2); await epRemote.SendBlockAsync(handshake2); } _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, connection.ConnectionId, remotePeerId)); } //replace exist connection if (_connectedInbound.Remove(remotePeerId, out var exist)) exist.Disconnect(); _connectedInbound[remotePeerId] = epRemote; // start RxCycle while (true) { var receivedBlock = await epRemote.RxBlockAsync(); if (receivedBlock == null) break; var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(receivedBlock); if (nonceResult != TimestampNonceResult.OK) { if (nonceResult == TimestampNonceResult.ReplayAttackDetected) _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, remotePeerId)); if (nonceResult == TimestampNonceResult.TimestampSkew) _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, remotePeerId)); break; } _eventBus.Publish(new TPM_EVT_INBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload)); } } catch (Exception exception) { _eventBus.Publish(new TPM_EVT_INBOUND_CON_ERROR(_localPeer.PeerId, connection.ConnectionId, remotePeerId, exception)); } finally { _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId)); epRemote?.Disconnect(); if (remotePeerId != null) _connectedInbound.Remove(remotePeerId, out _); } } public bool ConnectToRemotePeer(string remotePeerId) { if (_trustedRemotePeers.TryGetValue(remotePeerId, out var peerInfo) == false) return false; _ = Task.Run(async () => { EncryptedTcpPeer? epOutbound = null; try { //Create Peer client and connect to server { var sckOutbound = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.CONNECTION_ATTEMPT, _localPeer.PeerId, remotePeerId)); await sckOutbound.ConnectAsync(peerInfo.Host, peerInfo.Port); var connOutbound = _sckConnectionContextFactory.Create(sckOutbound); var tcpPeerOutbound = new TcpPeer(connOutbound); epOutbound = new EncryptedTcpPeer(_localPeer.PrivateKey, peerInfo.PublicKey, tcpPeerOutbound); // handshake as client _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_ATTEMPT, _localPeer.PeerId, remotePeerId)); // ☞ tx handshake 1 var (handshake1, _) = _nonceManager.NewNonceWithPayload(Convert.FromHexString(_localPeer.PeerId)); await epOutbound.SendBlockAsync(handshake1); // ☞ rx handshake 2 var handshake2 = await epOutbound.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); if (handshake2 == null) { return; } // timeout or disconnect var (nonceResult, _, pl) = _nonceManager.CheckValidAndExtractPayload(handshake2); if (nonceResult != TimestampNonceResult.OK) { if (nonceResult == TimestampNonceResult.ReplayAttackDetected) _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId)); if (nonceResult == TimestampNonceResult.TimestampSkew) _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId)); epOutbound.Disconnect(); return; } // handshake not match, disconnect if (pl.Span.SequenceEqual(SHA256.HashData(handshake1.Span)) != true) { _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_FAIL_ACK_NOT_MATCHED, _localPeer.PeerId, remotePeerId)); return; } _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, remotePeerId)); } //replace exist connection if (_connectedOutbound.Remove(remotePeerId, out var exist)) exist.Disconnect(); _connectedOutbound[remotePeerId] = epOutbound; while (true) // start RxCycle { var rxb = await epOutbound.RxBlockAsync(); if (rxb == null) break; var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(rxb); if (nonceResult != TimestampNonceResult.OK) { if (nonceResult == TimestampNonceResult.ReplayAttackDetected) _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId)); if (nonceResult == TimestampNonceResult.TimestampSkew) _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId)); break; } _eventBus.Publish(new TPM_EVT_OUTBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload)); } } catch (Exception ex) { _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_ERROR(_localPeer.PeerId, remotePeerId, ex)); } finally { _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, remotePeerId)); _connectedOutbound.Remove(remotePeerId, out _); epOutbound?.Disconnect(); } }); return true; } public bool DisconnectRemotePeer(string remotePeerId) { var flag = false; if (_connectedInbound.Remove(remotePeerId, out var inbound)) { flag = true; inbound.Disconnect(); } if (_connectedOutbound.Remove(remotePeerId, out var outbound)) { flag = true; outbound.Disconnect(); } return flag; } public void AddOrReplaceRemotePeer(IRemotePeerInfo peerInfo) { _trustedRemotePeers[peerInfo.PeerId] = peerInfo; } public bool RemoveRemotePeer(string remotePeerId) { DisconnectRemotePeer(remotePeerId); return _trustedRemotePeers.Remove(remotePeerId, out _); } public void Start() { Task.Run(async () => { try { _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STARTING)); await _listener.StartAsync(); _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STARTED)); } catch (Exception ex) { _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.START_FAIL, ex)); } }); } public void Stop() { Task.Run(async () => { try { _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STOPPING)); await _listener.StopAsync(); _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STOPPED)); } catch (Exception ex) { _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_INBOUND_LISTEN_STATUS.STOP_FAIL, ex)); } }); foreach (var item in _connectedInbound) item.Value.Disconnect(); _connectedInbound.Clear(); foreach (var item in _connectedOutbound) item.Value.Disconnect(); _connectedOutbound.Clear(); } public bool SendToRemotePeer(string remotePeerId, byte[] payload) { bool isOutbound; if (_connectedOutbound.TryGetValue(remotePeerId, out var remote) == false) { if (_connectedInbound.TryGetValue(remotePeerId, out remote)) isOutbound = false; else return false; } else { isOutbound = true; } Task.Run(async () => { var (payloadAndNonce, timestamp) = _nonceManager.NewNonceWithPayload(payload); await remote.SendBlockAsync(payloadAndNonce); if (isOutbound) { _eventBus.Publish(new TPM_EVT_OUTBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload)); } else { _eventBus.Publish(new TPM_EVT_INBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload)); } }); return true; } }