using System.Collections.Concurrent; using System.Net.Sockets; using System.Security.Cryptography; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; using PCC.App.Networking; using PCC.App.Security; using PCC.App.TransferModels; using PCC.Common.EventBus; namespace PCC.App; // Handshake 1 → Public Key SHA256 // Handshake 2 ← ACK for Yes or No(close connection) // *Complete Handshake // SendText 1 → Payload // SendText 2 ← ACK // ACK: SHA256(incoming payload) public abstract class TrustedPeerManager { private const int NONCE_LENGTH_BYTES = 16; private const int NONCE_EXPIRE_SECOND = 60; private const int NONCE_SKEW_SECOND = 30; public record TPM_EVT_CMD_INIT(byte[] MyPub, byte[] MyPri); public record TPM_EVT_CMD_SHUTDOWN; public record TPM_EVT_PEER_IX(string PeerId); public record TPM_EVT_PEER_CX(string PeerId); public record TPM_EVT_PEER_DX(string PeerId); public record TPM_EVT_PEER_RX(string PeerId, DateTimeOffset senderTimestamp, ReadOnlyMemory payload); public record TPM_EVT_PEER_TX(string PeerId, DateTimeOffset senderTimestamp, ReadOnlyMemory payload); public record TPM_EVT_PEER_XX(XX Kind, string? PeerId); public enum XX { Invalid = 0, NonTrustPeer, HandshakeFailServerAckNotMatch, TimeStampSkew, ReplayAttackDetected, } private readonly IEventBus _eventBus; private readonly ILogger _logger; private readonly SocketConnectionContextFactory _connectionContextFactory; private RSA? _myPrivateKey; private ReadOnlyMemory? _myPeerId; private readonly ConcurrentDictionary _trustedPeers = new(); private readonly ConcurrentDictionary _connectedPeers = new(); private readonly ConcurrentDictionary _IncomePeers = new(); private readonly TimestampNonceManager _nonceManager; protected TrustedPeerManager(IEventBus eventBus, ILogger logger) { _eventBus = eventBus; _logger = logger; _connectionContextFactory = new(new(), _logger); _nonceManager = new TimestampNonceManager(NONCE_LENGTH_BYTES - TimestampNonceManager.TimestampLength, TimeSpan.FromMicroseconds(NONCE_EXPIRE_SECOND), TimeSpan.FromSeconds(NONCE_SKEW_SECOND)); eventBus.Subscript(Init); eventBus.Subscript(Shutdown); } private void Init(TPM_EVT_CMD_INIT obj) { _myPrivateKey = RsaUtility.FromPKCS1PrivateKey(obj.MyPri); _myPeerId = SHA256.HashData(obj.MyPub); } private void Shutdown(TPM_EVT_CMD_SHUTDOWN _) { foreach (var item in _connectedPeers) { item.Value.Disconnect(); _connectedPeers.Remove(item.Key, out var _); } foreach (var item in _IncomePeers) { item.Value.Disconnect(); _connectedPeers.Remove(item.Key, out var _); } _nonceManager.Dispose(); } public string AddPeer(byte[] publicKey, string host, int port) { var peerId = Convert.ToHexString(SHA256.HashData(publicKey)); _trustedPeers[peerId] = new PeerInfo(RsaUtility.FromPKCS1PublicKey(publicKey), host, port); return peerId; } public bool RemovePeer(string peerId) => _trustedPeers.Remove(peerId, out _); public bool ConnectToPeerAsync(string peerId) { if (_trustedPeers.TryGetValue(peerId, out var peerInfo) == false) { return false; } if (_myPrivateKey == null || _myPeerId == null) throw new InvalidOperationException("Not init"); _ = Task.Run(async () => { EncryptedTcpPeer? epLocal = null; try { //Create Peer client and connect to server { var sckLocal = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sckLocal.ConnectAsync(peerInfo.Host, peerInfo.Port); var conn = _connectionContextFactory.Create(sckLocal); var tcpPeerLocal = new TcpPeer(conn); epLocal = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, tcpPeerLocal); // handshake as client // ☞ tx handshake 1 var (handshake1, _) = _nonceManager.NewNonceWithPayload(_myPeerId.Value); await epLocal.SendBlockAsync(handshake1); // ☞ rx handshake 2 var handshake2 = await epLocal.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); if (handshake2 == null) { epLocal.Disconnect(); return; } // timeout? disconnect var (ok, _, pl) = _nonceManager.CheckValidAndExtractPayload(handshake2); if (ok != true) { PublishXX(peerId, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid }); epLocal.Disconnect(); return; } // handshake not match, disconnect if (pl.Span.SequenceEqual(SHA256.HashData(handshake1.Span)) != true) { PublishXX(peerId, XX.HandshakeFailServerAckNotMatch); epLocal.Disconnect(); return; } } //replace exist connection if (_connectedPeers.Remove(peerId, out var exist)) exist.Disconnect(); // register as connected _connectedPeers[peerId] = epLocal; _eventBus.Publish(new TPM_EVT_PEER_CX(peerId)); while (true) // start RxCycle { var rxb = await epLocal.RxBlockAsync(); if (rxb == null) break; var (ok, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(rxb); if (ok != true) { PublishXX(peerId, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid }); epLocal.Disconnect(); break; } _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, senderTimestamp, payload)); } } catch (ConnectionAbortedException) { //ignore } catch (Exception ex) { _logger.LogError(ex, nameof(ConnectToPeerAsync)); } _eventBus.Publish(new TPM_EVT_PEER_DX(peerId)); // unregister from connected _connectedPeers.Remove(peerId, out _); epLocal?.Disconnect(); }); return true; } public async Task HandleIncomingPeerAsync(ConnectionContext connection) { if (_myPrivateKey == null) throw new InvalidOperationException("Not init"); string? peerId = null; try { // handshake as server EncryptedTcpPeer epRemote; { var tpRemote = new TcpPeer(connection); //rx handshake1 PeerInfo peerInfo; byte[] payloadOfHandshake1; { var receivedBlock = await tpRemote.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); if (receivedBlock == null) return; var handshake1 = RsaUtility.Decrypt(_myPrivateKey, receivedBlock); var (ok, _, payload) = _nonceManager.CheckValidAndExtractPayload(handshake1); if (ok != true) { PublishXX(null, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid }); tpRemote.Disconnect(); return; } peerId = Convert.ToHexString(payload.Span); // Kick if non-trusted if (_trustedPeers.TryGetValue(peerId, out peerInfo!) == false) { PublishXX(peerId, XX.NonTrustPeer); return; } // verify signature payloadOfHandshake1 = RsaUtility.DecryptAndVerifySignature(_myPrivateKey, receivedBlock, peerInfo.PublicKey); } //tx handshake2 { epRemote = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, tpRemote); var payloadOfHandshake2 = SHA256.HashData(payloadOfHandshake1); var (handshake2, _) = _nonceManager.NewNonceWithPayload(payloadOfHandshake2); await epRemote.SendBlockAsync(handshake2); } } //replace exist connection if (_IncomePeers.Remove(peerId, out var exist)) exist.Disconnect(); // register as income _IncomePeers[peerId] = epRemote; // start RxCycle while (true) { var receivedBlock = await epRemote.RxBlockAsync(); if (receivedBlock == null) break; var (ok, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(receivedBlock); if (ok != true) { PublishXX(peerId, ok switch { null => XX.TimeStampSkew, false => XX.ReplayAttackDetected, _ => XX.Invalid }); break; } _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, senderTimestamp, payload)); } } catch (ConnectionAbortedException) { //ignore } catch (Exception e) { _logger.LogError(e, nameof(HandleIncomingPeerAsync)); } if (peerId != null) { _eventBus.Publish(new TPM_EVT_PEER_DX(peerId)); // unregister from income if (_IncomePeers.Remove(peerId, out var peer)) { peer.Disconnect(); } } } private void PublishXX(string? peerId, XX kind) => _eventBus.Publish(new TPM_EVT_PEER_XX(kind, peerId)); public bool SendToPeer(string peerId, byte[] payload) { if (_connectedPeers.TryGetValue(peerId, out var peerCon) == false && _IncomePeers.TryGetValue(peerId, out peerCon) == false) return false; Task.Run(async () => { var (payloadAndNonce, timestamp) = _nonceManager.NewNonceWithPayload(payload); await peerCon.SendBlockAsync(payloadAndNonce); _eventBus.Publish(new TPM_EVT_PEER_TX(peerId, timestamp, payload)); }); return true; } private record PeerInfo(RSA PublicKey, string Host, int Port); }