*編輯:請參閱下面的解答我的答案。從Threading.Timer啓動的任務
以下是否有危險?我試圖追查我認爲可能是一種競爭條件。我想我會從這裏開始並從那裏開始。
private BlockingCollection<MyTaskType>_MainQ = new BlockingCollection<MyTaskType>();
private void Start()
{
_CheckTask = new Timer(new TimerCallback(CheckTasks), null, 10, 5000);
}
private void CheckTasks(object state)
{
_CheckTask.Change(Timeout.Infinite, Timeout.Infinite);
GetTask();
_CheckTask.Change(5000,5000);
}
private void GetTask()
{
//get task from database to object
Task.Factory.StartNew(delegate {
AddToWorkQueue(); //this adds to _MainQ which is a BlockingCollection
});
}
private void AddToWorkQueue()
{
//do some stuff to get stuff to move
_MainQ.Add(dataobject);
}
編輯:我也使用靜態類來處理寫入數據庫。每個調用都應該有自己獨特的數據,從多個線程調用,因此它不共享數據。你認爲這可能是爭議的來源嗎?下面
代碼:
public static void ExecuteNonQuery(string connectionString, string sql, CommandType commandType, List<FastSqlParam> paramCollection = null, int timeout = 60)
{
//Console.WriteLine("{0} [Thread {1}] called ExecuteNonQuery", DateTime.Now.ToString("HH:mm:ss:ffffff"), System.Threading.Thread.CurrentThread.ManagedThreadId);
using (SqlConnection connection = new SqlConnection(connectionString))
using (SqlCommand command = new SqlCommand(sql, connection))
{
try
{
if (paramCollection != null)
{
foreach (FastSqlParam fsqlParam in paramCollection)
{
try
{
SqlParameter param = new SqlParameter();
param.Direction = fsqlParam.ParamDirection;
param.Value = fsqlParam.ParamValue;
param.ParameterName = fsqlParam.ParamName;
param.SqlDbType = fsqlParam.ParamType;
command.Parameters.Add(param);
}
catch (ArgumentNullException anx)
{
throw new Exception("Parameter value was null", anx);
}
catch (InvalidCastException icx)
{
throw new Exception("Could not cast parameter value", icx);
}
}
}
connection.Open();
command.CommandType = commandType;
command.CommandTimeout = timeout;
command.ExecuteNonQuery();
if (paramCollection != null)
{
foreach (FastSqlParam fsqlParam in paramCollection)
{
if (fsqlParam.ParamDirection == ParameterDirection.InputOutput || fsqlParam.ParamDirection == ParameterDirection.Output)
try
{
fsqlParam.ParamValue = command.Parameters[fsqlParam.ParamName].Value;
}
catch (ArgumentNullException anx)
{
throw new Exception("Output parameter value was null", anx);
}
catch (InvalidCastException icx)
{
throw new Exception("Could not cast parameter value", icx);
}
}
}
}
catch (SqlException ex)
{
throw ex;
}
catch (ArgumentException ex)
{
throw ex;
}
}
}
每個請求:
FastSql.ExecuteNonQuery(connectionString, "someProc", System.Data.CommandType.StoredProcedure, new List<FastSqlParam>() { new FastSqlParam(SqlDbType.Int, "@SomeParam", variable)});
同時,我想指出,這個代碼似乎從VS2010 [調試或發行]隨機運行它失敗。當我執行發佈版本時,在將託管它的開發服務器上運行安裝程序,應用程序未能崩潰並且運行平穩。
每個請求:
當前線程架構:
- 線程從數據庫調度表
- 線程A的讀出1結果,如果返回的行,啓動一個
Task
登錄到資源來查看是否有要傳輸的文件。該任務引用的對象包含使用靜態調用創建的DataTable
中的數據。基本如下。 如果有找到的文件,
Task
增加_MainQ要移動的文件//Called from Thread A void ProcessTask() { var parameters = new List<FastSqlParam>() { new FastSqlParam(SqlDbType.Int, "@SomeParam", variable) }; using (DataTable someTable = FastSql.ExecuteDataTable(connectionString, "someProc", CommandType.StoredProcedure, parameters)) { SomeTask task = new Task(); //assign task some data from dt.Rows[0] if (task != null) { Task.Factory.StartNew(delegate { AddFilesToQueue(task); }); } } } void AddFilesToQueue(Task task) { //connect to remote system and build collection of files to WorkItem //e.g, WorkItem will have a collection of collections to transfer. We control this throttling mechanism to allow more threads to split up the work _MainQ.Add(WorkItem); }
你覺得有可能是返回從FastSql.ExecuteDataTable
值的問題,因爲它是一個靜態類,然後使用它與using
塊?
你能發表一段代碼,在隊列中調用'Take'嗎?另外,顯示何時調用ExecuteNonQuery也會很有用。 – 2010-07-23 14:30:00
對不起,我不是很清楚。我真的很想看看在哪個線程上執行ExecuteNonQuery的代碼。我假設它是調用'Take'的線程。如果是這種情況,那麼你至少有3個線程在這裏玩1)調用'Add' 2)定時器回調本身3)和一個調用'Take'。全面瞭解線程如何交互是識別任何可能競爭條件的唯一可靠方法。 – 2010-07-23 16:22:05
不,我不認爲它與返回'FastSql.ExecuteDataTable'是'using'的目標有關。我仍然看不到'BlockingCollection.Take'方法被調用的地方。 – 2010-07-23 20:13:29