123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- using System.Collections.Concurrent;
- using System.Net.Sockets;
- using System.Security.Cryptography;
- using Microsoft.AspNetCore.Connections;
- using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
- using PCC.App.Networking;
- using PCC.App.Security;
- using PCC.Common.EventBus;
- using PCC.Common.Networking;
- namespace PCC.App.Tpm;
- internal class LocalPeerManager
- {
- private readonly KestrelTcpServer _listener;
- private readonly SocketConnectionContextFactory _sckConnectionContextFactory;
- private readonly TimestampNonceManager _nonceManager;
- private readonly IEventBus _eventBus;
- private readonly ILocalPeerInfo _localPeer;
- private readonly ConcurrentDictionary<string, IRemotePeerInfo> _trustedRemotePeers;
- private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedInbound = new();
- private readonly ConcurrentDictionary<string, EncryptedTcpPeer> _connectedOutbound = new();
- public LocalPeerManager(SocketConnectionContextFactory sckConnectionContextFactory, TimestampNonceManager nonceManager, ILocalPeerInfo localPeer, IEventBus eventBus, ILogger<TrustedPeerManager> logger, ILogger<KestrelTcpServer> ktsLogger)
- {
- _sckConnectionContextFactory = sckConnectionContextFactory;
- _nonceManager = nonceManager;
- _localPeer = localPeer;
- _eventBus = eventBus;
- _listener = new(localPeer.Address, localPeer.Port, HandleInboundConnection, ktsLogger);
- _trustedRemotePeers = new(localPeer.TrustedRemotePeers.ToDictionary(p => p.PeerId));
- }
- private async Task HandleInboundConnection(ConnectionContext connection)
- {
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_ACCEPTED(_localPeer.PeerId, connection.ConnectionId, connection.RemoteEndPoint));
- string? remotePeerId = null;
- EncryptedTcpPeer? epRemote = null;
- try
- {
- // handshake as server
- {
- var tpRemote = new TcpPeer(connection);
- //rx handshake1
- IRemotePeerInfo peerPeerInfo;
- byte[] payloadOfHandshake1;
- {
- var receivedBlock = await tpRemote.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
- if (receivedBlock == null) return;
- var handshake1 = RsaUtility.Decrypt(_localPeer.PrivateKey, receivedBlock);
- var (nonceResult, _, payload) = _nonceManager.CheckValidAndExtractPayload(handshake1);
- if (nonceResult != TimestampNonceResult.OK)
- {
- if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, null));
- if (nonceResult == TimestampNonceResult.TimestampSkew)
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, null));
- return;
- }
- remotePeerId = Convert.ToHexString(payload.Span);
- // Kick if non-trusted
- if (_trustedRemotePeers.TryGetValue(remotePeerId, out peerPeerInfo!) == false)
- {
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_FAIL_NOT_TRUSTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
- return;
- }
- // verify signature
- payloadOfHandshake1 = RsaUtility.DecryptAndVerifySignature(_localPeer.PrivateKey, receivedBlock, peerPeerInfo.PublicKey);
- }
- //tx handshake2
- {
- epRemote = new EncryptedTcpPeer(_localPeer.PrivateKey, peerPeerInfo.PublicKey, tpRemote);
- var payloadOfHandshake2 = SHA256.HashData(payloadOfHandshake1);
- var (handshake2, _) = _nonceManager.NewNonceWithPayload(payloadOfHandshake2);
- await epRemote.SendBlockAsync(handshake2);
- }
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
- }
- //replace exist connection
- if (_connectedInbound.Remove(remotePeerId, out var exist)) exist.Disconnect();
- _connectedInbound[remotePeerId] = epRemote;
- // start RxCycle
- while (true)
- {
- var receivedBlock = await epRemote.RxBlockAsync();
- if (receivedBlock == null) break;
- var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(receivedBlock);
- if (nonceResult != TimestampNonceResult.OK)
- {
- if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
- if (nonceResult == TimestampNonceResult.TimestampSkew)
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
- break;
- }
- _eventBus.Publish(new TPM_EVT_INBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload));
- }
- }
- catch (Exception exception)
- {
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_ERROR(_localPeer.PeerId, connection.ConnectionId, remotePeerId, exception));
- }
- finally
- {
- _eventBus.Publish(new TPM_EVT_INBOUND_CON_STATUS_CHANGED(TPM_EVT_INBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, connection.ConnectionId, remotePeerId));
- epRemote?.Disconnect();
- if (remotePeerId != null) _connectedInbound.Remove(remotePeerId, out _);
- }
- }
- public bool ConnectToRemotePeer(string remotePeerId)
- {
- if (_trustedRemotePeers.TryGetValue(remotePeerId, out var peerInfo) == false) return false;
- _ = Task.Run(async () =>
- {
- EncryptedTcpPeer? epOutbound = null;
- try
- {
- //Create Peer client and connect to server
- {
- var sckOutbound = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.CONNECTION_ATTEMPT, _localPeer.PeerId, remotePeerId));
- await sckOutbound.ConnectAsync(peerInfo.Host, peerInfo.Port);
- var connOutbound = _sckConnectionContextFactory.Create(sckOutbound);
- var tcpPeerOutbound = new TcpPeer(connOutbound);
- epOutbound = new EncryptedTcpPeer(_localPeer.PrivateKey, peerInfo.PublicKey, tcpPeerOutbound);
- // handshake as client
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_ATTEMPT, _localPeer.PeerId, remotePeerId));
- // ☞ tx handshake 1
- var (handshake1, _) = _nonceManager.NewNonceWithPayload(Convert.FromHexString(_localPeer.PeerId));
- await epOutbound.SendBlockAsync(handshake1);
- // ☞ rx handshake 2
- var handshake2 = await epOutbound.RxBlockAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
- if (handshake2 == null)
- {
- return;
- } // timeout or disconnect
- var (nonceResult, _, pl) = _nonceManager.CheckValidAndExtractPayload(handshake2);
- if (nonceResult != TimestampNonceResult.OK)
- {
- if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId));
- if (nonceResult == TimestampNonceResult.TimestampSkew)
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId));
- epOutbound.Disconnect();
- return;
- }
- // handshake not match, disconnect
- if (pl.Span.SequenceEqual(SHA256.HashData(handshake1.Span)) != true)
- {
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_FAIL_ACK_NOT_MATCHED, _localPeer.PeerId, remotePeerId));
- return;
- }
- }
- //replace exist connection
- if (_connectedOutbound.Remove(remotePeerId, out var exist)) exist.Disconnect();
- _connectedOutbound[remotePeerId] = epOutbound;
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.HANDSHAKE_OK, _localPeer.PeerId, remotePeerId));
- while (true) // start RxCycle
- {
- var rxb = await epOutbound.RxBlockAsync();
- if (rxb == null) break;
- var (nonceResult, senderTimestamp, payload) = _nonceManager.CheckValidAndExtractPayload(rxb);
- if (nonceResult != TimestampNonceResult.OK)
- {
- if (nonceResult == TimestampNonceResult.ReplayAttackDetected)
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_ALERT_REPLAY_ATTACK_DETECT, _localPeer.PeerId, remotePeerId));
- if (nonceResult == TimestampNonceResult.TimestampSkew)
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.SECURE_WARN_TIMESTAMP_SKEW, _localPeer.PeerId, remotePeerId));
- break;
- }
- _eventBus.Publish(new TPM_EVT_OUTBOUND_RX(_localPeer.PeerId, remotePeerId, senderTimestamp, payload));
- }
- }
- catch (Exception ex)
- {
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_ERROR(_localPeer.PeerId, remotePeerId, ex));
- }
- finally
- {
- _eventBus.Publish(new TPM_EVT_OUTBOUND_CON_STATUS_CHANGED(TPM_EVT_OUTBOUND_CON_STATUS.DISCONNECTED, _localPeer.PeerId, remotePeerId));
- _connectedOutbound.Remove(remotePeerId, out _);
- epOutbound?.Disconnect();
- }
- });
- return true;
- }
- public bool DisconnectRemotePeer(string remotePeerId)
- {
- var flag = false;
- if (_connectedInbound.Remove(remotePeerId, out var inbound))
- {
- flag = true;
- inbound.Disconnect();
- }
- if (_connectedOutbound.Remove(remotePeerId, out var outbound))
- {
- flag = true;
- outbound.Disconnect();
- }
- return flag;
- }
- public void AddOrReplaceRemotePeer(IRemotePeerInfo peerInfo)
- {
- _trustedRemotePeers[peerInfo.PeerId] = peerInfo;
- }
- public bool RemoveRemotePeer(string remotePeerId)
- {
- DisconnectRemotePeer(remotePeerId);
- return _trustedRemotePeers.Remove(remotePeerId, out _);
- }
- public void Start()
- {
- Task.Run(async () =>
- {
- try
- {
- _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STARTING));
- await _listener.StartAsync();
- _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STARTED));
- }
- catch (Exception ex)
- {
- _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.START_FAIL, ex));
- }
- });
- }
- public void Stop()
- {
- Task.Run(async () =>
- {
- try
- {
- _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STOPPING));
- await _listener.StopAsync();
- _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STOPPED));
- }
- catch (Exception ex)
- {
- _eventBus.Publish(new TPM_EVT_LISTENER_STATUS_CHANGED(_localPeer.PeerId, TPM_EVT_LISTEN_STATUS.STOP_FAIL, ex));
- }
- });
- foreach (var item in _connectedInbound) item.Value.Disconnect();
- _connectedInbound.Clear();
- foreach (var item in _connectedOutbound) item.Value.Disconnect();
- _connectedOutbound.Clear();
- }
- public bool SendToRemotePeer(string remotePeerId, byte[] payload)
- {
- bool isOutbound;
- if (_connectedOutbound.TryGetValue(remotePeerId, out var remote) == false)
- {
- if (_connectedInbound.TryGetValue(remotePeerId, out remote))
- isOutbound = false;
- else
- return false;
- }
- else
- {
- isOutbound = true;
- }
- Task.Run(async () =>
- {
- var (payloadAndNonce, timestamp) = _nonceManager.NewNonceWithPayload(payload);
- await remote.SendBlockAsync(payloadAndNonce);
- if (isOutbound)
- {
- _eventBus.Publish(new TPM_EVT_OUTBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload));
- }
- else
- {
- _eventBus.Publish(new TPM_EVT_INBOUND_TX(_localPeer.PeerId, remotePeerId, timestamp, payload));
- }
- });
- return true;
- }
- }
|