Browse Source

Each connection now has a dedicaded thread for send operations

Tal Aloni 8 years ago
parent
commit
0ac96e5b7d

+ 1 - 0
ISCSI/Server/ConnectionState.cs

@@ -31,6 +31,7 @@ namespace ISCSI.Server
         public ConnectionParameters ConnectionParameters = new ConnectionParameters();
 
         public CountdownLatch RunningSCSICommands = new CountdownLatch();
+        public BlockingQueue<ISCSIPDU> SendQueue = new BlockingQueue<ISCSIPDU>();
 
         public string ConnectionIdentifier
         {

+ 66 - 36
ISCSI/Server/ISCSIServer.cs

@@ -100,6 +100,13 @@ namespace ISCSI.Server
             // Disable the Nagle Algorithm for this tcp socket:
             clientSocket.NoDelay = true;
             state.ClientSocket = clientSocket;
+            Thread senderThread = new Thread(delegate()
+            {
+                ProcessSendQueue(state);
+            });
+            senderThread.IsBackground = true;
+            senderThread.Start();
+
             try
             {
                 clientSocket.BeginReceive(state.ReceiveBuffer, 0, ConnectionState.ReceiveBufferSize, 0, ReceiveCallback, state);
@@ -159,6 +166,7 @@ namespace ISCSI.Server
                 Log(Severity.Verbose, "The initiator has closed the connection");
                 // Wait for pending I/O to complete.
                 state.RunningSCSICommands.WaitUntilZero();
+                state.SendQueue.Stop();
                 m_connectionManager.RemoveConnection(state);
                 return;
             }
@@ -221,7 +229,7 @@ namespace ISCSI.Server
                         reject.Reason = RejectReason.InvalidPDUField;
                         reject.Data = ByteReader.ReadBytes(pduBytes, 0, 48);
 
-                        TrySendPDU(state, reject);
+                        state.SendQueue.Enqueue(reject);
                     }
 
                     if (pdu != null)
@@ -234,7 +242,7 @@ namespace ISCSI.Server
                             reject.InitiatorTaskTag = pdu.InitiatorTaskTag;
                             reject.Reason = RejectReason.CommandNotSupported;
                             reject.Data = ByteReader.ReadBytes(pduBytes, 0, 48);
-                            TrySendPDU(state, reject);
+                            state.SendQueue.Enqueue(reject);
                         }
                         else
                         {
@@ -318,6 +326,7 @@ namespace ISCSI.Server
                             // Wait for pending I/O to complete.
                             existingConnection.RunningSCSICommands.WaitUntilZero();
                             SocketUtils.ReleaseSocket(existingConnection.ClientSocket);
+                            existingConnection.SendQueue.Stop();
                             m_connectionManager.RemoveConnection(existingConnection);
                             Log(Severity.Verbose, "[{0}] Implicit logout completed", state.ConnectionIdentifier);
                         }
@@ -330,7 +339,7 @@ namespace ISCSI.Server
                         m_connectionManager.AddConnection(state);
                     }
                     Log(Severity.Verbose, "[{0}] Login Response parameters: {1}", state.ConnectionIdentifier, KeyValuePairUtils.ToString(response.LoginParameters));
-                    TrySendPDU(state, response);
+                    state.SendQueue.Enqueue(response);
                 }
                 else
                 {
@@ -349,8 +358,7 @@ namespace ISCSI.Server
                         LoginResponsePDU loginResponse = new LoginResponsePDU();
                         loginResponse.TSIH = state.SessionParameters.TSIH;
                         loginResponse.Status = LoginResponseStatusName.InvalidDuringLogon;
-                        TrySendPDU(state, loginResponse);
-                        clientSocket.Close();
+                        state.SendQueue.Enqueue(loginResponse);
                     }
                 }
             }
@@ -360,7 +368,7 @@ namespace ISCSI.Server
                 {
                     TextRequestPDU request = (TextRequestPDU)pdu;
                     TextResponsePDU response = ServerResponseHelper.GetTextResponsePDU(request, m_targets);
-                    TrySendPDU(state, response);
+                    state.SendQueue.Enqueue(response);
                 }
                 else if (pdu is LogoutRequestPDU)
                 {
@@ -372,7 +380,7 @@ namespace ISCSI.Server
                         RejectPDU reject = new RejectPDU();
                         reject.Reason = RejectReason.ProtocolError;
                         reject.Data = ByteReader.ReadBytes(pdu.GetBytes(), 0, 48);
-                        TrySendPDU(state, reject);
+                        state.SendQueue.Enqueue(reject);
                     }
                     else
                     {
@@ -403,8 +411,8 @@ namespace ISCSI.Server
                             m_connectionManager.RemoveConnection(connection);
                         }
                         LogoutResponsePDU response = ServerResponseHelper.GetLogoutResponsePDU(request);
-                        TrySendPDU(state, response);
-                        clientSocket.Close(); // We can close the connection now
+                        state.SendQueue.Enqueue(response);
+                        // connection will be closed after a LogoutResponsePDU has been sent.
                     }
                 }
                 else if (state.SessionParameters.IsDiscovery)
@@ -416,7 +424,7 @@ namespace ISCSI.Server
                     reject.Reason = RejectReason.ProtocolError;
                     reject.Data = ByteReader.ReadBytes(pdu.GetBytes(), 0, 48);
 
-                    TrySendPDU(state, reject);
+                    state.SendQueue.Enqueue(reject);
                 }
                 else if (pdu is NOPOutPDU)
                 {
@@ -424,7 +432,7 @@ namespace ISCSI.Server
                     if (request.InitiatorTaskTag != 0xFFFFFFFF)
                     {
                         NOPInPDU response = ServerResponseHelper.GetNOPResponsePDU(request);
-                        TrySendPDU(state, response);
+                        state.SendQueue.Enqueue(response);
                     }
                 }
                 else if (pdu is SCSIDataOutPDU || pdu is SCSICommandPDU)
@@ -448,7 +456,7 @@ namespace ISCSI.Server
                             reject.InitiatorTaskTag = request.InitiatorTaskTag;
                             reject.Reason = RejectReason.InvalidPDUField;
                             reject.Data = ByteReader.ReadBytes(request.GetBytes(), 0, 48);
-                            TrySendPDU(state, reject);
+                            state.SendQueue.Enqueue(reject);
                         }
                     }
                     else
@@ -459,7 +467,7 @@ namespace ISCSI.Server
                     }
                     foreach (ReadyToTransferPDU readyToTransferPDU in readyToTransferPDUs)
                     {
-                        TrySendPDU(state, readyToTransferPDU);
+                        state.SendQueue.Enqueue(readyToTransferPDU);
                     }
                     if (commandsToExecute != null)
                     {
@@ -473,15 +481,7 @@ namespace ISCSI.Server
                         state.RunningSCSICommands.Decrement();
                         responseList.AddRange(commandResponseList);
                     }
-
-                    foreach (ISCSIPDU response in responseList)
-                    {
-                        TrySendPDU(state, response);
-                        if (!clientSocket.Connected)
-                        {
-                            return;
-                        }
-                    }
+                    state.SendQueue.Enqueue(responseList);
                 }
                 else if (pdu is LoginRequestPDU)
                 {
@@ -493,7 +493,7 @@ namespace ISCSI.Server
                     reject.Reason = RejectReason.ProtocolError;
                     reject.Data = ByteReader.ReadBytes(pdu.GetBytes(), 0, 48);
 
-                    TrySendPDU(state, reject);
+                    state.SendQueue.Enqueue(reject);
                 }
                 else
                 {
@@ -502,17 +502,24 @@ namespace ISCSI.Server
                     reject.Reason = RejectReason.CommandNotSupported;
                     reject.Data = ByteReader.ReadBytes(pdu.GetBytes(), 0, 48);
 
-                    TrySendPDU(state, reject);
+                    state.SendQueue.Enqueue(reject);
                 }
             }
             Log(Severity.Trace, "Leaving ProcessPDU");
         }
 
-        private void TrySendPDU(ConnectionState state, ISCSIPDU response)
+        private void ProcessSendQueue(ConnectionState state)
         {
-            Socket clientSocket = state.ClientSocket;
-            try
+            while (true)
             {
+                Log(Severity.Trace, "Entering ProcessSendQueue");
+                ISCSIPDU response;
+                bool stopped = !state.SendQueue.TryDequeue(out response);
+                if (stopped)
+                {
+                    return;
+                }
+                Socket clientSocket = state.ClientSocket;
                 PDUHelper.SetStatSN(response, state.ConnectionParameters.StatSN);
                 PDUHelper.SetExpCmdSN(response, state.SessionParameters.ExpCmdSN, state.SessionParameters.ExpCmdSN + state.SessionParameters.CommandQueueSize);
                 if (response is SCSIResponsePDU ||
@@ -523,15 +530,38 @@ namespace ISCSI.Server
                 {
                     state.ConnectionParameters.StatSN++;
                 }
-                clientSocket.Send(response.GetBytes());
-                Log(Severity.Debug, "[{0}] Sent response to initator, Operation: {1}, Size: {2}", state.ConnectionIdentifier, response.OpCode, response.Length);
-            }
-            catch (SocketException ex)
-            {
-                Log(Severity.Debug, "[{0}] Failed to send response to initator (Operation: {1}, Size: {2}), SocketException: {3}", state.ConnectionIdentifier, response.OpCode, response.Length, ex.Message);
-            }
-            catch (ObjectDisposedException)
-            {
+                try
+                {
+                    clientSocket.Send(response.GetBytes());
+                    Log(Severity.Verbose, "[{0}] Sent response to initator, Operation: {1}, Size: {2}", state.ConnectionIdentifier, response.OpCode, response.Length);
+                    if (response is LogoutResponsePDU)
+                    {
+                        clientSocket.Close(); // We can close the connection now
+                        Log(Severity.Trace, "Leaving ProcessSendQueue");
+                        return;
+                    }
+                    else if (response is LoginResponsePDU)
+                    {
+                        if (((LoginResponsePDU)response).Status == LoginResponseStatusName.InvalidDuringLogon)
+                        {
+                            clientSocket.Close(); // We can close the connection now
+                            Log(Severity.Trace, "Leaving ProcessSendQueue");
+                            return;                            
+                        }
+                    }
+                }
+                catch (SocketException ex)
+                {
+                    Log(Severity.Verbose, "[{0}] Failed to send response to initator. Operation: {1}, Size: {2}, SocketException: {3}", state.ConnectionIdentifier, response.OpCode, response.Length, ex.Message);
+                    Log(Severity.Trace, "Leaving ProcessSendQueue");
+                    return;
+                }
+                catch (ObjectDisposedException)
+                {
+                    Log(Severity.Verbose, "[{0}] Failed to send response to initator. Operation: {1}, Size: {2}. ObjectDisposedException", state.ConnectionIdentifier, response.OpCode, response.Length);
+                    Log(Severity.Trace, "Leaving ProcessSendQueue");
+                    return;
+                }
             }
         }
 

+ 85 - 0
Utilities/Generics/BlockingQueue.cs

@@ -0,0 +1,85 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Text;
+
+namespace Utilities
+{
+    public class BlockingQueue<T>
+    {
+        private Queue<T> m_queue = new Queue<T>();
+        private int m_count = 0;
+        private bool m_stopping;
+
+        public void Enqueue(T item)
+        {
+            lock (m_queue)
+            {
+                m_queue.Enqueue(item);
+                m_count++;
+                if (m_queue.Count == 1)
+                {
+                    Monitor.Pulse(m_queue);
+                }
+            }
+        }
+
+        public void Enqueue(List<T> items)
+        {
+            if (items.Count == 0)
+            {
+                return;
+            }
+            lock (m_queue)
+            {
+                foreach (T item in items)
+                {
+                    m_queue.Enqueue(item);
+                    m_count++;
+                }
+                if (m_queue.Count == items.Count)
+                {
+                    Monitor.Pulse(m_queue);
+                }
+            }
+        }
+
+        /// <returns>Will return false if the BlockingQueue is stopped</returns>
+        public bool TryDequeue(out T item)
+        {
+            lock (m_queue)
+            {
+                while (m_queue.Count == 0)
+                {
+                    Monitor.Wait(m_queue);
+                    if (m_stopping)
+                    {
+                        item = default(T);
+                        return false;
+                    }
+                }
+
+                item = m_queue.Dequeue();
+                m_count--;
+                return true;
+            }
+        }
+
+        public void Stop()
+        {
+            lock (m_queue)
+            {
+                m_stopping = true;
+                Monitor.PulseAll(m_queue);
+            }
+        }
+
+        public int Count
+        {
+            get
+            {
+                return m_count;
+            }
+        }
+    }
+}

+ 1 - 0
Utilities/Utilities.csproj

@@ -42,6 +42,7 @@
     <Compile Include="Conversion\Conversion.SimpleTypes.cs" />
     <Compile Include="Conversion\LittleEndianConverter.cs" />
     <Compile Include="Cryptography\CRC32.cs" />
+    <Compile Include="Generics\BlockingQueue.cs" />
     <Compile Include="Generics\KeyValuePairList.cs" />
     <Compile Include="Generics\Map.cs" />
     <Compile Include="IFileSystem\FileSystem.cs" />