Browse Source

Improved connection buffer implementation

Tal Aloni 8 years ago
parent
commit
bd61fc4424

+ 2 - 3
ISCSI/ISCSI.Client/ConnectionState.cs

@@ -12,8 +12,7 @@ namespace ISCSI.Client
 {
     internal class ConnectionState
     {
-        public const int ReceiveBufferSize = 131072; // Note: FirstBurstLength, MaxBurstLength and MaxRecvDataSegmentLength put a cap on DataSegmentLength, NOT on the PDU length.
-        public byte[] ReceiveBuffer = new byte[ReceiveBufferSize]; // immediate receive buffer
-        public byte[] ConnectionBuffer = new byte[0]; // we append the receive buffer here until we have a complete PDU
+        public static int ReceiveBufferSize = ISCSIPDU.BasicHeaderSegmentLength + ISCSIClient.DeclaredParameters.MaxRecvDataSegmentLength;
+        public ISCSIConnectionReceiveBuffer ReceiveBuffer = new ISCSIConnectionReceiveBuffer(ReceiveBufferSize);
     }
 }

+ 21 - 68
ISCSI/ISCSI.Client/ISCSIClient.cs

@@ -56,7 +56,8 @@ namespace ISCSI.Client
                     return false;
                 }
                 ConnectionState state = new ConnectionState();
-                m_currentAsyncResult = m_clientSocket.BeginReceive(state.ReceiveBuffer, 0, state.ReceiveBuffer.Length, SocketFlags.None, new AsyncCallback(OnClientSocketReceive), state);
+                ISCSIConnectionReceiveBuffer buffer = state.ReceiveBuffer;
+                m_currentAsyncResult = m_clientSocket.BeginReceive(buffer.Buffer, buffer.WriteOffset, buffer.AvailableLength, SocketFlags.None, new AsyncCallback(OnClientSocketReceive), state);
                 m_isConnected = true;
             }
             return m_isConnected;
@@ -325,12 +326,13 @@ namespace ISCSI.Client
             }
             else
             {
-                byte[] currentBuffer = ByteReader.ReadBytes(state.ReceiveBuffer, 0, numberOfBytesReceived);
-                ProcessCurrentBuffer(currentBuffer, state);
+                ISCSIConnectionReceiveBuffer buffer = state.ReceiveBuffer;
+                buffer.SetNumberOfBytesReceived(numberOfBytesReceived);
+                ProcessConnectionBuffer(state);
 
                 try
                 {
-                    m_currentAsyncResult = m_clientSocket.BeginReceive(state.ReceiveBuffer, 0, state.ReceiveBuffer.Length, SocketFlags.None, new AsyncCallback(OnClientSocketReceive), state);
+                    m_currentAsyncResult = m_clientSocket.BeginReceive(buffer.Buffer, buffer.WriteOffset, buffer.AvailableLength, SocketFlags.None, new AsyncCallback(OnClientSocketReceive), state);
                 }
                 catch (ObjectDisposedException)
                 {
@@ -345,79 +347,30 @@ namespace ISCSI.Client
             }
         }
 
-        private void ProcessCurrentBuffer(byte[] currentBuffer, ConnectionState state)
+        private void ProcessConnectionBuffer(ConnectionState state)
         {
-            if (state.ConnectionBuffer.Length == 0)
+            ISCSIConnectionReceiveBuffer buffer = state.ReceiveBuffer;
+            while (buffer.HasCompletePDU())
             {
-                state.ConnectionBuffer = currentBuffer;
-            }
-            else
-            {
-                state.ConnectionBuffer = ByteUtils.Concatenate(state.ConnectionBuffer, currentBuffer);
-            }
-
-            // we now have all PDU bytes received so far in state.ConnectionBuffer
-            int bytesLeftInBuffer = state.ConnectionBuffer.Length;
+                ISCSIPDU pdu = null;
+                try
+                {
+                    pdu = buffer.DequeuePDU();
+                }
+                catch (Exception)
+                {
+                    throw;
+                }
 
-            while (bytesLeftInBuffer >= 8)
-            {
-                int bufferOffset = state.ConnectionBuffer.Length - bytesLeftInBuffer;
-                int pduLength = ISCSIPDU.GetPDULength(state.ConnectionBuffer, bufferOffset);
-                if (pduLength > bytesLeftInBuffer)
+                if (pdu.GetType() == typeof(ISCSIPDU))
                 {
-                    break;
+                    throw new Exception("Unsupported");
                 }
                 else
                 {
-                    ISCSIPDU pdu = null;
-                    try
-                    {
-                        pdu = ISCSIPDU.GetPDU(state.ConnectionBuffer, bufferOffset);
-                    }
-                    catch (UnsupportedSCSICommandException)
-                    {
-                        throw;
-                    }
-                    catch (Exception)
-                    {
-                        throw;
-                    }
-
-                    bytesLeftInBuffer -= pduLength;
-
-                    if (pdu.GetType() == typeof(ISCSIPDU))
-                    {
-                        /*
-                        Log("[{0}][ProcessCurrentBuffer] Unsupported PDU (0x{1})", state.Connection.Identifier, pdu.OpCode.ToString("X"));
-                        // Unsupported PDU
-                        RejectPDU reject = new RejectPDU();
-                        reject.Reason = RejectReason.CommandNotSupported;
-                        reject.StatSN = state.Connection.StatSN;
-                        reject.ExpCmdSN = state.Connection.ExpCmdSN;
-                        reject.MaxCmdSN = state.Connection.ExpCmdSN + ISCSIServer.CommandQueueSize;
-                        reject.Data = ByteReader.ReadBytes(pduBytes, 0, 48);
-
-                        // StatSN is advanced after a Reject
-                        state.Connection.StatSN++;
-
-                        TrySendPDU(state, reject);*/
-                        throw new Exception("Unsupported");
-                    }
-                    else
-                    {
-                        ProcessPDU(pdu, state);
-                    }
+                    ProcessPDU(pdu, state);
                 }
             }
-
-            if (bytesLeftInBuffer > 0)
-            {
-                state.ConnectionBuffer = ByteReader.ReadBytes(state.ConnectionBuffer, state.ConnectionBuffer.Length - bytesLeftInBuffer, bytesLeftInBuffer);
-            }
-            else
-            {
-                state.ConnectionBuffer = new byte[0];
-            }
         }
 
         private void ProcessPDU(ISCSIPDU pdu, ConnectionState state)

+ 132 - 0
ISCSI/ISCSI.Common/ISCSIConnectionBuffer.cs

@@ -0,0 +1,132 @@
+/* Copyright (C) 2012-2016 Tal Aloni <tal.aloni.il@gmail.com>. All rights reserved.
+ * 
+ * You can redistribute this program and/or modify it under the terms of
+ * the GNU Lesser Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ */
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Utilities;
+
+namespace ISCSI
+{
+    /// <summary>
+    /// iSCSO Connection Receive Buffer
+    /// </summary>
+    public class ISCSIConnectionReceiveBuffer
+    {
+        private byte[] m_buffer;
+        private int m_readOffset = 0;
+        private int m_bytesInBuffer = 0;
+        private int? m_pduLength;
+
+        /// <param name="bufferLength">
+        /// bufferLength should be set to BasicHeaderSegmentLength + TotalAHSLength + HeaderDigestLength + MaxRecvDataSegmentLength + DataDigestLength.
+        /// (because DataSegmentLength MUST not exceed MaxRecvDataSegmentLength for the direction it is sent)
+        /// </param>
+        public ISCSIConnectionReceiveBuffer(int bufferLength)
+        {
+            m_buffer = new byte[bufferLength];
+        }
+
+        public void SetNumberOfBytesReceived(int numberOfBytesReceived)
+        {
+            m_bytesInBuffer += numberOfBytesReceived;
+        }
+
+        public bool HasCompletePDU()
+        {
+            if (m_bytesInBuffer >= 8)
+            {
+                if (!m_pduLength.HasValue)
+                {
+                    m_pduLength = ISCSIPDU.GetPDULength(m_buffer, m_readOffset);
+                }
+                return m_bytesInBuffer >= m_pduLength.Value;
+            }
+            return false;
+        }
+
+        /// <summary>
+        /// HasCompletePDU must be called and return true before calling DequeuePDU
+        /// </summary>
+        /// <exception cref="System.IO.InvalidDataException"></exception>
+        public ISCSIPDU DequeuePDU()
+        {
+            ISCSIPDU pdu;
+            try
+            {
+                pdu = ISCSIPDU.GetPDU(m_buffer, m_readOffset);
+            }
+            catch (IndexOutOfRangeException ex)
+            {
+                throw new System.IO.InvalidDataException("Invalid PDU", ex);
+            }
+            RemovePDUBytes();
+            return pdu;
+        }
+
+        /// <summary>
+        /// HasCompletePDU must be called and return true before calling DequeuePDUBytes
+        /// </summary>
+        public byte[] DequeuePDUBytes()
+        {
+            byte[] pduBytes = ByteReader.ReadBytes(m_buffer, m_readOffset, m_pduLength.Value);
+            RemovePDUBytes();
+            return pduBytes;
+        }
+
+        private void RemovePDUBytes()
+        {
+            m_bytesInBuffer -= m_pduLength.Value;
+            if (m_bytesInBuffer == 0)
+            {
+                m_readOffset = 0;
+                m_pduLength = null;
+            }
+            else
+            {
+                m_readOffset += m_pduLength.Value;
+                m_pduLength = null;
+                if (!HasCompletePDU())
+                {
+                    Array.Copy(m_buffer, m_readOffset, m_buffer, 0, m_bytesInBuffer);
+                    m_readOffset = 0;
+                }
+            }
+        }
+
+        public byte[] Buffer
+        {
+            get
+            {
+                return m_buffer;
+            }
+        }
+
+        public int WriteOffset
+        {
+            get
+            {
+                return m_readOffset + m_bytesInBuffer;
+            }
+        }
+
+        public int BytesInBuffer
+        {
+            get
+            {
+                return m_bytesInBuffer;
+            }
+        }
+
+        public int AvailableLength
+        {
+            get
+            {
+                return m_buffer.Length - (m_readOffset + m_bytesInBuffer);
+            }
+        }
+    }
+}

+ 2 - 6
ISCSI/ISCSI.Server/ConnectionState.cs

@@ -19,12 +19,8 @@ namespace ISCSI.Server
     internal class ConnectionState
     {
         public Socket ClientSocket = null;
-        /// <summary>
-        /// DataSegmentLength MUST not exceed MaxRecvDataSegmentLength for the direction it is sent and the total of all the DataSegmentLength of all PDUs in a sequence MUST not exceed MaxBurstLength (or FirstBurstLength for unsolicited data).
-        /// </summary>
-        public const int ReceiveBufferSize = 131072; // Note: FirstBurstLength, MaxBurstLength and MaxRecvDataSegmentLength put a cap on DataSegmentLength, NOT on the PDU length.
-        public byte[] ReceiveBuffer = new byte[ReceiveBufferSize]; // immediate receive buffer
-        public byte[] ConnectionBuffer = new byte[0]; // we append the receive buffer here until we have a complete PDU
+        public static int ReceiveBufferSize = ISCSIPDU.BasicHeaderSegmentLength + ISCSIServer.DeclaredParameters.MaxRecvDataSegmentLength;
+        public ISCSIConnectionReceiveBuffer ReceiveBuffer = new ISCSIConnectionReceiveBuffer(ReceiveBufferSize);
 
         public ConnectionParameters ConnectionParameters = new ConnectionParameters();
 

+ 44 - 74
ISCSI/ISCSI.Server/ISCSIServer.cs

@@ -137,7 +137,6 @@ namespace ISCSI.Server
 
             ConnectionState state = new ConnectionState();
             state.ConnectionParameters.InitiatorEndPoint = clientSocket.RemoteEndPoint as IPEndPoint;
-            state.ReceiveBuffer = new byte[ConnectionState.ReceiveBufferSize];
             // Disable the Nagle Algorithm for this tcp socket:
             clientSocket.NoDelay = true;
             state.ClientSocket = clientSocket;
@@ -148,9 +147,10 @@ namespace ISCSI.Server
             senderThread.IsBackground = true;
             senderThread.Start();
 
+            ISCSIConnectionReceiveBuffer buffer = state.ReceiveBuffer;
             try
             {
-                clientSocket.BeginReceive(state.ReceiveBuffer, 0, ConnectionState.ReceiveBufferSize, 0, ReceiveCallback, state);
+                clientSocket.BeginReceive(buffer.Buffer, buffer.WriteOffset, buffer.AvailableLength, 0, ReceiveCallback, state);
             }
             catch (ObjectDisposedException)
             {
@@ -240,12 +240,13 @@ namespace ISCSI.Server
                 return;
             }
 
-            byte[] currentBuffer = ByteReader.ReadBytes(state.ReceiveBuffer, 0, numberOfBytesReceived);
-            ProcessCurrentBuffer(currentBuffer, state);
+            ISCSIConnectionReceiveBuffer buffer = state.ReceiveBuffer;
+            buffer.SetNumberOfBytesReceived(numberOfBytesReceived);
+            ProcessConnectionBuffer(state);
 
             try
             {
-                clientSocket.BeginReceive(state.ReceiveBuffer, 0, ConnectionState.ReceiveBufferSize, 0, ReceiveCallback, state);
+                clientSocket.BeginReceive(buffer.Buffer, buffer.WriteOffset, buffer.AvailableLength, 0, ReceiveCallback, state);
             }
             catch (ObjectDisposedException)
             {
@@ -285,97 +286,66 @@ namespace ISCSI.Server
             }
         }
 
-        private void ProcessCurrentBuffer(byte[] currentBuffer, ConnectionState state)
+        private void ProcessConnectionBuffer(ConnectionState state)
         {
             Socket clientSocket = state.ClientSocket;
 
-            if (state.ConnectionBuffer.Length == 0)
+            ISCSIConnectionReceiveBuffer buffer = state.ReceiveBuffer;
+            while (buffer.HasCompletePDU())
             {
-                state.ConnectionBuffer = currentBuffer;
-            }
-            else
-            {
-                state.ConnectionBuffer = ByteUtils.Concatenate(state.ConnectionBuffer, currentBuffer);
-            }
-
-            // we now have all PDU bytes received so far in state.ConnectionBuffer
-            int bytesLeftInBuffer = state.ConnectionBuffer.Length;
-
-            while (bytesLeftInBuffer >= 8)
-            {
-                int bufferOffset = state.ConnectionBuffer.Length - bytesLeftInBuffer;
-                int pduLength = ISCSIPDU.GetPDULength(state.ConnectionBuffer, bufferOffset);
-                if (pduLength > bytesLeftInBuffer)
+                ISCSIPDU pdu = null;
+                try
                 {
-                    Log(Severity.Debug, "[{0}][ProcessCurrentBuffer] Bytes left in receive buffer: {1}", state.ConnectionIdentifier, bytesLeftInBuffer);
-                    break;
+                    pdu = buffer.DequeuePDU();
                 }
-                else
+                catch (Exception ex)
                 {
-                    ISCSIPDU pdu = null;
-                    try
-                    {
-                        pdu = ISCSIPDU.GetPDU(state.ConnectionBuffer, bufferOffset);
-                    }
-                    catch (Exception ex)
+                    byte[] pduBytes = buffer.DequeuePDUBytes();
+                    Log(Severity.Error, "[{0}] Failed to read PDU (Exception: {1})", state.ConnectionIdentifier, ex.Message);
+                    RejectPDU reject = new RejectPDU();
+                    reject.Reason = RejectReason.InvalidPDUField;
+                    reject.Data = ByteReader.ReadBytes(pduBytes, 0, 48);
+
+                    state.SendQueue.Enqueue(reject);
+                }
+
+                if (pdu != null)
+                {
+                    if (pdu.GetType() == typeof(ISCSIPDU))
                     {
-                        Log(Severity.Error, "[{0}] Failed to read PDU (Exception: {1})", state.ConnectionIdentifier, ex.Message);
+                        Log(Severity.Error, "[{0}][ProcessCurrentBuffer] Unsupported PDU (0x{1})", state.ConnectionIdentifier, pdu.OpCode.ToString("X"));
+                        // Unsupported PDU
                         RejectPDU reject = new RejectPDU();
-                        reject.Reason = RejectReason.InvalidPDUField;
-                        reject.Data = ByteReader.ReadBytes(state.ConnectionBuffer, bufferOffset, 48);
-
+                        reject.InitiatorTaskTag = pdu.InitiatorTaskTag;
+                        reject.Reason = RejectReason.CommandNotSupported;
+                        reject.Data = ByteReader.ReadBytes(pdu.GetBytes(), 0, 48);
                         state.SendQueue.Enqueue(reject);
                     }
-
-                    bytesLeftInBuffer -= pduLength;
-
-                    if (pdu != null)
+                    else
                     {
-                        if (pdu.GetType() == typeof(ISCSIPDU))
+                        bool valid = ValidateCommandNumbering(pdu, state);
+                        if (valid)
                         {
-                            Log(Severity.Error, "[{0}][ProcessCurrentBuffer] Unsupported PDU (0x{1})", state.ConnectionIdentifier, pdu.OpCode.ToString("X"));
-                            // Unsupported PDU
-                            RejectPDU reject = new RejectPDU();
-                            reject.InitiatorTaskTag = pdu.InitiatorTaskTag;
-                            reject.Reason = RejectReason.CommandNotSupported;
-                            reject.Data = ByteReader.ReadBytes(state.ConnectionBuffer, bufferOffset, 48);
-                            state.SendQueue.Enqueue(reject);
+                            ProcessPDU(pdu, state);
                         }
                         else
                         {
-                            bool valid = ValidateCommandNumbering(pdu, state);
-                            if (valid)
-                            {
-                                ProcessPDU(pdu, state);
-                            }
-                            else
-                            {
-                                // We ignore this PDU
-                                Log(Severity.Warning, "[{0}] Ignoring PDU with CmdSN outside of expected range", state.ConnectionIdentifier);
-                            }
+                            // We ignore this PDU
+                            Log(Severity.Warning, "[{0}] Ignoring PDU with CmdSN outside of expected range", state.ConnectionIdentifier);
                         }
                     }
-                    
-                    if (!clientSocket.Connected)
+                }
+
+                if (!clientSocket.Connected)
+                {
+                    // Do not continue to process the buffer if the other side closed the connection
+                    if (buffer.BytesInBuffer > 0)
                     {
-                        // Do not continue to process the buffer if the other side closed the connection
-                        if (bytesLeftInBuffer > 0)
-                        {
-                            Log(Severity.Debug, "[{0}] Buffer processing aborted, bytes left in receive buffer: {1}", state.ConnectionIdentifier, bytesLeftInBuffer);
-                        }
-                        return;
+                        Log(Severity.Debug, "[{0}] Buffer processing aborted, bytes left in receive buffer: {1}", state.ConnectionIdentifier, buffer.BytesInBuffer);
                     }
+                    return;
                 }
             }
-
-            if (bytesLeftInBuffer > 0)
-            {
-                state.ConnectionBuffer = ByteReader.ReadBytes(state.ConnectionBuffer, state.ConnectionBuffer.Length - bytesLeftInBuffer, bytesLeftInBuffer);
-            }
-            else
-            {
-                state.ConnectionBuffer = new byte[0];
-            }
         }
 
         private void ProcessSendQueue(ConnectionState state)

+ 1 - 0
ISCSI/ISCSI.csproj

@@ -42,6 +42,7 @@
     <Compile Include="ISCSI.Client\ISCSISession.cs" />
     <Compile Include="ISCSI.Client\PDUHelper.cs" />
     <Compile Include="ISCSI.Common\DefaultParameters.cs" />
+    <Compile Include="ISCSI.Common\ISCSIConnectionBuffer.cs" />
     <Compile Include="ISCSI.PDU\Enums\ISCSIOpCodeName.cs" />
     <Compile Include="ISCSI.PDU\Enums\ISCSIResponseName.cs" />
     <Compile Include="ISCSI.PDU\Enums\LoginResponseStatusClassName.cs" />