--近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改
1 /// <summary>
2 /// 并发对象
3 /// </summary>
4 public class MeterAsyncQueue
5 {
6 public MeterAsyncQueue()
7 {
8 MeterInfoTask = new MeterInfo();
9 }
10
11 public MeterInfo MeterInfoTask { get; set; }
12 }
13 public class MeterInfo
14 {
15 public MeterInfo()
16 {
17
18 }
19 public int Id { get; set; }
20
21 }
知识兔
1 /// <summary>
2 /// 线程通用类
3 /// </summary>
4 public class TaskCommand
5 {
6 CancellationTokenSource tokenSource = new CancellationTokenSource();
7 ManualResetEvent resetEvent = new ManualResetEvent(true);
8 Thread thread = null;
9 /// <summary>
10 /// 开始任务
11 /// </summary>
12 public void StartData()
13 {
14 tokenSource = new CancellationTokenSource();
15 resetEvent = new ManualResetEvent(true);
16
17 List<int> Ids = new List<int>();
18 for (int i = 0; i < 10000; i++)
19 {
20 Ids.Add(i);
21 }
22 thread = new Thread(new ThreadStart(() => StartTask(Ids)));
23 thread.Start();
24 }
25 /// <summary>
26 /// 暂停任务
27 /// </summary>
28 public void OutData()
29 {
30 //task暂停
31 resetEvent.Reset();
32 }
33 /// <summary>
34 /// 继续任务
35 /// </summary>
36 public void ContinueData()
37 {
38 //task继续
39 resetEvent.Set();
40 }
41 /// <summary>
42 /// 取消任务
43 /// </summary>
44 public void Cancel()
45 {
46 //释放对象
47 resetEvent.Dispose();
48 foreach (var CurrentTask in ParallelTasks)
49 {
50 if (CurrentTask != null)
51 {
52 if (CurrentTask.Status == TaskStatus.Running) { }
53 {
54 //终止task线程
55 tokenSource.Cancel();
56 }
57 }
58 }
59 thread.Abort();
60 }
61 /// <summary>
62 /// 执行数据
63 /// </summary>
64 /// <param name="Index"></param>
65 public void Execute(int Index)
66 {
67 //阻止当前线程
68 resetEvent.WaitOne();
69
70 Console.WriteLine("当前第" + Index + "个线程");
71
72 Thread.Sleep(1000);
73
74 }
75 //队列对象
76 private Queue<MeterAsyncQueue> AsyncQueues { get; set; }
77
78 /// <summary>
79 /// 并发任务数
80 /// </summary>
81 private int ParallelTaskCount { get; set; }
82
83
84 /// <summary>
85 /// 并行任务集合
86 /// </summary>
87 private List<Task> ParallelTasks { get; set; }
88 //控制线程并行数量
89 public void StartTask(List<int> Ids)
90 {
91 IsInitTask = true;
92 ParallelTasks = new List<Task>();
93 AsyncQueues = new Queue<MeterAsyncQueue>();
94 //获取并发数
95 ParallelTaskCount = 5;
96
97 //初始化异步队列
98 InitAsyncQueue(Ids);
99 //开始执行队列任务
100 HandlingTask();
101
102 Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) });
103 }
104 /// <summary>
105 /// 初始化异步队列
106 /// </summary>
107 private void InitAsyncQueue(List<int> Ids)
108 {
109 foreach (var item in Ids)
110 {
111 MeterInfo info = new MeterInfo();
112 info.Id = item;
113 AsyncQueues.Enqueue(new MeterAsyncQueue()
114 {
115 MeterInfoTask = info
116 });
117 }
118 }
119 /// <summary>
120 /// 是否首次执行任务
121 /// </summary>
122 private bool IsInitTask { get; set; }
123 //锁
124 private readonly object _objLock = new object();
125
126 /// <summary>
127 /// 开始执行队列任务
128 /// </summary>
129 private void HandlingTask()
130 {
131 lock (_objLock)
132 {
133 if (AsyncQueues.Count <= 0)
134 {
135 return;
136 }
137
138 var loopCount = GetAvailableTaskCount();
139 //并发处理队列
140 for (int i = 0; i < loopCount; i++)
141 {
142 HandlingQueue();
143 }
144 IsInitTask = false;
145 }
146 }
147 /// <summary>
148 /// 获取队列锁
149 /// </summary>
150 private readonly object _queueLock = new object();
151
152 /// <summary>
153 /// 处理队列
154 /// </summary>
155 private void HandlingQueue()
156 {
157 CancellationToken token = tokenSource.Token;
158 lock (_queueLock)
159 {
160 if (AsyncQueues.Count > 0)
161 {
162 var asyncQueue = AsyncQueues.Dequeue();
163
164 if (asyncQueue == null) return;
165 var task = Task.Factory.StartNew(() =>
166 {
167 if (token.IsCancellationRequested)
168 {
169 return;
170 }
171 //阻止当前线程
172 resetEvent.WaitOne();
173 //执行任务
174 Execute(asyncQueue.MeterInfoTask.Id);
175
176 }, token).ContinueWith(t =>
177 {
178 HandlingTask();
179 }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
180 ParallelTasks.Add(task);
181 }
182 }
183 }
184 /// <summary>
185 /// 获取当前有效并行的任务数
186 /// </summary>
187 /// <returns></returns>
188 [MethodImpl(MethodImplOptions.Synchronized)]
189 private int GetAvailableTaskCount()
190 {
191 if (IsInitTask)
192 return ParallelTaskCount;
193 return 1;
194 }
195 }
知识兔
-
计算机
-