123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- using System.Collections.Concurrent;
- using System.Net.Sockets;
- using System.Security.Cryptography;
- using Microsoft.AspNetCore.Connections;
- using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
- using PCC.App.Networking;
- using PCC.App.Security;
- using PCC.Common.EventBus;
- namespace PCC.App;
- // Handshake 1 → Public Key SHA256
- // Handshake 2 ← ACK for Yes or No(close connection)
- // *Complete Handshake
- // SendText 1 → Payload
- // SendText 2 ← ACK
- // ACK: SHA256(incoming payload)
- public abstract class TrustedPeerManager
- {
- public record TPM_EVT_CMD_INIT(byte[] MyPub, byte[] MyPri);
- public record TPM_EVT_CMD_SHUTDOWN;
- public record TPM_EVT_PEER_CX(string PeerId);
- public record TPM_EVT_PEER_IX(string PeerId);
- public record TPM_EVT_PEER_DX(string PeerId);
- public record TPM_EVT_PEER_RX(string PeerId, byte[] block);
- private readonly IEventBus _eventBus;
- private readonly ILogger<TrustedPeerManager> _logger;
- private readonly SocketConnectionContextFactory _connectionContextFactory;
- private RSA _myPrivateKey;
- private ReadOnlyMemory<byte> _myPeerId;
- private readonly ConcurrentDictionary<string, PeerInfo> _trustedPeers = new();
- private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedPeers = new();
- private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _IncomePeers = new();
- protected TrustedPeerManager(IEventBus eventBus, ILogger<TrustedPeerManager> logger)
- {
- _eventBus = eventBus;
- _logger = logger;
- _connectionContextFactory = new(new(), _logger);
- eventBus.Subscript<TPM_EVT_CMD_INIT>(Init);
- eventBus.Subscript<TPM_EVT_CMD_SHUTDOWN>(Shutdown);
- }
- private void Init(TPM_EVT_CMD_INIT obj)
- {
- _myPrivateKey = RsaUtility.FromPKCS1PrivateKey(obj.MyPri);
- _myPeerId = SHA256.HashData(obj.MyPub);
- }
- private void Shutdown(TPM_EVT_CMD_SHUTDOWN _)
- {
- foreach (var item in _connectedPeers.Values)
- {
- item.Disconnect();
- }
- foreach (var item in _IncomePeers.Values)
- {
- item.Disconnect();
- }
- }
- public string AddTrustPeer(byte[] tpk, string host, int port)
- {
- var peerId = Convert.ToHexString(SHA256.HashData(tpk));
- var rsa = RsaUtility.FromPKCS1PublicKey(tpk);
- _trustedPeers[peerId] = new PeerInfo(rsa, host, port);
- return peerId;
- }
- public bool ConnectToPeerAsync(string peerId)
- {
- if (_trustedPeers.TryGetValue(peerId, out var peerInfo) == false)
- {
- return false;
- }
- _ = Task.Run(async () =>
- {
- try
- {
- //Start Peer client
- // connect to server
- using var sck = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- await sck.ConnectAsync(peerInfo.host, peerInfo.port);
- var conn = _connectionContextFactory.Create(sck);
- var peer = new TcpPeer(conn, sck);
- // handshake as client
- var ep = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, peer);
- await ep.SendBlock(_myPeerId); //send handshake 1
- var blockAck = await ep.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
- if (blockAck?.SequenceEqual(SHA256.HashData(_myPeerId.Span)) != true)
- {
- peer.Disconnect();
- return;
- }
- // register as connected
- //TODO: replace exist connection
- _connectedPeers[peerId] = ep;
- _eventBus.Publish(new TPM_EVT_PEER_CX(peerId));
- while (true) // start RxCycle
- {
- var rxb = await ep.RxBlockAsync();
- if (rxb == null) break;
- _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, rxb));
- }
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, nameof(ConnectToPeerAsync));
- }
- _eventBus.Publish(new TPM_EVT_PEER_DX(peerId));
- });
- return true;
- }
- public async Task HandleIncomingPeerAsync(ConnectionContext connection)
- {
- string? peerId = null;
- try
- {
- var peer = new TcpPeer(connection, null);
- // handshake as server
- var eb = await peer.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
- if (eb == null) return;
- // extract peerId
- peerId = Convert.ToHexString(RsaUtility.Decrypt(_myPrivateKey, eb));
- // Kick if non trusted
- if (_trustedPeers.TryGetValue(peerId, out var peerInfo) == false) return;
- // verify signature
- var data = RsaUtility.DecryptAndVerifySignature(_myPrivateKey, eb, peerInfo.PublicKey);
- var ep = new EncryptedTcpPeer(_myPrivateKey, peerInfo.PublicKey, peer);
- await ep.SendBlock(SHA256.HashData(data)); //ACK
- // register as income
- //TODO: replace exist connection
- _IncomePeers[peerId] = ep;
- // start RxCycle
- while (true) // start RxCycle
- {
- var rxb = await ep.RxBlockAsync();
- if (rxb == null) break;
- _eventBus.Publish(new TPM_EVT_PEER_RX(peerId, rxb));
- }
- }
- catch (Exception e)
- {
- _logger.LogError(e, nameof(HandleIncomingPeerAsync));
- }
- if (peerId != null) _eventBus.Publish(new TPM_EVT_PEER_DX(peerId));
- }
- public bool SendToPeer(string peerId, byte[] block)
- {
- if (_connectedPeers.TryGetValue(peerId, out var peerCon) == false && _IncomePeers.TryGetValue(peerId, out peerCon) == false) return false;
- _ = peerCon.SendBlock(block);
- return true;
- }
- private record PeerInfo(RSA PublicKey, string host, int port);
- }
|