using Microsoft.VisualBasic; using System; using System.Collections; using System.Collections.Generic; using System.Data; using System.Diagnostics; using System.Threading; public delegate void ThreadErrorHandlerDelegate(ThreadPoolWorkItem oWorkItem, Exception oError); public class ThreadPoolWorkItem { public bool m_bStoreOutput = false; public string m_sName = ""; public Delegate m_pMethod = null; public object[] m_pInput = null; public object m_oOutput = null; public Exception m_oException = null; public ThreadPoolWorkItem() { } public ThreadPoolWorkItem(string sName, Delegate pMethod, object[] pInput, bool bStoreOutput) { m_sName = sName; m_pMethod = pMethod; m_pInput = pInput; m_bStoreOutput = bStoreOutput; } } public class XYThreadPool { private Hashtable m_htThreads = new Hashtable(256); private int m_nMinThreadCount = 5; private int m_nMaxThreadCount = 10; private int m_nShutdownPause = 200; private int m_nServerPause = 25; private bool m_bContinue = false; private static Exception m_oException = null; private Queue m_qInput = new Queue(1024); private Queue m_qOutput = new Queue(1024); private Delegate m_delegateThreadErrorHandler = new ThreadErrorHandlerDelegate(OnThreadError); private void ThreadProc() { while (m_bContinue) { object obj = null; Monitor.Enter(this); if (m_qInput.Count > 0) obj = m_qInput.Dequeue(); Monitor.Exit(this); if (obj == null) { bool bQuit = false; Monitor.Enter(this); if (m_htThreads.Count > m_nMinThreadCount) { m_htThreads.Remove(Thread.CurrentThread.Name); bQuit = true; } Monitor.Exit(this); if (bQuit) return; Thread.Sleep(10 * m_nServerPause); } else { ThreadPoolWorkItem oWorkItem = (ThreadPoolWorkItem)obj; //oWorkItem.m_oOutput = oWorkItem.m_pMethod.DynamicInvoke(oWorkItem.m_pInput) try { oWorkItem.m_oOutput = oWorkItem.m_pMethod.DynamicInvoke(oWorkItem.m_pInput); } catch (Exception oBug) { if ((m_delegateThreadErrorHandler != null)) { try { object[] pInput = { oWorkItem, oBug }; m_delegateThreadErrorHandler.DynamicInvoke(pInput); } catch { } } } if (oWorkItem.m_bStoreOutput) { Monitor.Enter(m_qOutput); m_qOutput.Enqueue(oWorkItem); Monitor.Exit(m_qOutput); } Thread.Sleep(m_nServerPause); } } } private static void OnThreadError(ThreadPoolWorkItem oWorkItem, Exception oError) { if (oWorkItem == null) { m_oException = oError; } else { oWorkItem.m_oException = oError; } } public void SetThreadErrorHandler(ThreadErrorHandlerDelegate pMethod) { Monitor.Enter(this); m_delegateThreadErrorHandler = pMethod; Monitor.Exit(this); } public void SetServerPause(int nMilliseconds) { Monitor.Enter(this); if (nMilliseconds > 9 & nMilliseconds < 101) m_nServerPause = nMilliseconds; Monitor.Exit(this); } public void SetShutdownPause(int nMilliseconds) { Monitor.Enter(this); m_nShutdownPause = nMilliseconds; Monitor.Exit(this); } public Exception GetException() { return m_oException; } public void InsertWorkItem(ThreadPoolWorkItem oWorkItem) { try { Monitor.Enter(this); m_qInput.Enqueue(oWorkItem); if (m_bContinue && m_qInput.Count > m_htThreads.Count && m_htThreads.Count < m_nMaxThreadCount) { Thread th = new Thread(ThreadProc); th.Name = Guid.NewGuid().ToString(); m_htThreads.Add(th.Name, th); th.Start(); } } catch (Exception oBug) { m_oException = oBug; } finally { Monitor.Exit(this); } } public void InsertWorkItem(string sName, Delegate pMethod, object[] pArgs, bool bStoreOutput) { InsertWorkItem(new ThreadPoolWorkItem(sName, pMethod, pArgs, bStoreOutput)); } public ThreadPoolWorkItem ExtractWorkItem() { object oWorkItem = null; Monitor.Enter(m_qOutput); if (m_qOutput.Count > 0) oWorkItem = m_qOutput.Dequeue(); Monitor.Exit(m_qOutput); if (oWorkItem == null) return null; return (ThreadPoolWorkItem)oWorkItem; } public bool StartThreadPool(int nMinThreadCount = 5, int nMaxThreadCount = 10) { try { Monitor.Enter(this); if (m_bContinue == false) { m_bContinue = true; if (nMinThreadCount > 0) { m_nMinThreadCount = nMinThreadCount; } if (nMaxThreadCount > m_nMinThreadCount) { m_nMaxThreadCount = nMaxThreadCount; } else { m_nMaxThreadCount = 2 * m_nMinThreadCount; } int i = 0; for (i = 1; i <= m_nMinThreadCount; i++) { Thread th = new Thread(ThreadProc); th.Name = Guid.NewGuid().ToString(); m_htThreads.Add(th.Name, th); th.Start(); } } return true; } catch (Exception oBug) { m_bContinue = false; m_oException = oBug; return false; } finally { Monitor.Exit(this); } } public void StopThreadPool() { Monitor.Enter(this); m_bContinue = false; Thread.Sleep(Math.Max(200, m_nShutdownPause)); if ((m_nShutdownPause > 0)) { IDictionaryEnumerator dict = m_htThreads.GetEnumerator(); while (dict.MoveNext()) { Thread th = (Thread)dict.Value; if (th.IsAlive) { try { th.Abort(); } catch { } } } } m_htThreads.Clear(); m_qInput.Clear(); // m_qOutput.Clear() Monitor.Exit(this); } public int GetThreadCount() { Monitor.Enter(this); int nCount = m_htThreads.Count; Monitor.Exit(this); return nCount; } }