TcpPeer.cs 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. using System.Buffers;
  2. using Microsoft.AspNetCore.Connections;
  3. using PCC.App.BinaryFormatter;
  4. namespace PCC.App.Networking;
  5. public class TcpPeer(ConnectionContext context)
  6. {
  7. public async Task SendBlockAsync(ReadOnlyMemory<byte> block)
  8. {
  9. context.Transport.Output.WriteBlock(block);
  10. await context.Transport.Output.FlushAsync();
  11. }
  12. public async Task SendBlockAsync(byte[] block)
  13. {
  14. context.Transport.Output.WriteBlock(block);
  15. await context.Transport.Output.FlushAsync();
  16. }
  17. public async Task<byte[]?> RxBlockAsync(CancellationToken? timeOut = null)
  18. {
  19. while (true)
  20. {
  21. var result = await context.Transport.Input.ReadAsync(timeOut ?? CancellationToken.None);
  22. if (result.IsCanceled) break;
  23. var block = ReadBlock(result.Buffer);
  24. if (block != null) return block;
  25. if (result.IsCompleted) break;
  26. }
  27. return null;
  28. }
  29. private byte[]? ReadBlock(ReadOnlySequence<byte> rx)
  30. {
  31. var reader = new SequenceReader<byte>(rx);
  32. var lenR = reader.TryRead7BitEncodedInt(out var len);
  33. if (lenR.HasValue == false)
  34. {
  35. context.Transport.Input.AdvanceTo(rx.Start, reader.Position);
  36. return null;
  37. }
  38. if (lenR == false) throw new InvalidDataException("Invalid 7Bit Encoded Int");
  39. var requestLen = (int)reader.Consumed + len;
  40. if (rx.Length < requestLen)
  41. {
  42. context.Transport.Input.AdvanceTo(rx.Start, rx.End);
  43. return null;
  44. }
  45. reader.TryReadExact(len, out var seq);
  46. var block = seq.ToArray();
  47. context.Transport.Input.AdvanceTo(reader.Position);
  48. return block;
  49. }
  50. public void Disconnect()
  51. {
  52. context.Transport.Input.Complete();
  53. context.Transport.Output.Complete();
  54. }
  55. }