From d19fa3ce32cc639784114d0c4d0e0476cdfd1f94 Mon Sep 17 00:00:00 2001
From: falcon <9504402@qq.com>
Date: Thu, 2 Jan 2020 15:10:08 +0800
Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E5=BC=80?=
=?UTF-8?q?=E5=8F=91=EF=BC=8C=E7=AE=80=E5=8D=95=E6=B5=8B=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
Falcon.TaskScheduling.sln | 8 +-
Falcon.TaskScheduling/ConcurrentQueueTask.cs | 182 ++++++++++++++++++
.../Falcon.TaskScheduling.csproj | 3 +
Falcon.TaskScheduling/ITaskScheduler.cs | 35 ++++
Falcon.TaskScheduling/ITaskUnit.cs | 24 +++
Falcon.TaskScheduling/NotSupportException.cs | 15 ++
Falcon.TaskScheduling/QueueUnit.cs | 27 +++
Falcon.TaskScheduling/TaskSchedulerOption.cs | 26 +++
.../ConcurrentQueueTaskTests.cs | 132 +++++++++++++
.../Falcon.TaskSchedulingTests.csproj | 22 +++
Falcon.TaskSchedulingTests/TaskUnit.cs | 43 +++++
11 files changed, 516 insertions(+), 1 deletion(-)
create mode 100644 Falcon.TaskScheduling/ConcurrentQueueTask.cs
create mode 100644 Falcon.TaskScheduling/ITaskScheduler.cs
create mode 100644 Falcon.TaskScheduling/ITaskUnit.cs
create mode 100644 Falcon.TaskScheduling/NotSupportException.cs
create mode 100644 Falcon.TaskScheduling/QueueUnit.cs
create mode 100644 Falcon.TaskScheduling/TaskSchedulerOption.cs
create mode 100644 Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs
create mode 100644 Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj
create mode 100644 Falcon.TaskSchedulingTests/TaskUnit.cs
diff --git a/Falcon.TaskScheduling.sln b/Falcon.TaskScheduling.sln
index 8b813e1..0b10b80 100644
--- a/Falcon.TaskScheduling.sln
+++ b/Falcon.TaskScheduling.sln
@@ -3,7 +3,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29613.14
MinimumVisualStudioVersion = 10.0.40219.1
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Falcon.TaskScheduling", "Falcon.TaskScheduling\Falcon.TaskScheduling.csproj", "{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Falcon.TaskScheduling", "Falcon.TaskScheduling\Falcon.TaskScheduling.csproj", "{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Falcon.TaskSchedulingTests", "Falcon.TaskSchedulingTests\Falcon.TaskSchedulingTests.csproj", "{2FD44ACF-CEE9-4A50-870E-28F85865066F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -15,6 +17,10 @@ Global
{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/Falcon.TaskScheduling/ConcurrentQueueTask.cs b/Falcon.TaskScheduling/ConcurrentQueueTask.cs
new file mode 100644
index 0000000..1710874
--- /dev/null
+++ b/Falcon.TaskScheduling/ConcurrentQueueTask.cs
@@ -0,0 +1,182 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+using Falcon.Extend;
+using System.Linq;
+using System.Threading;
+
+namespace Falcon.TaskScheduling
+{
+ ///
+ /// 安全队列实现
+ ///
+ public class ConcurrentQueueTask:ITaskScheduler
+ {
+ ///
+ /// 选项
+ ///
+ public TaskSchedulerOption Option { get; set; }
+ ///
+ /// 系统级队列
+ ///
+ public ConcurrentQueue SystemQueue;
+ ///
+ /// 优先级队列
+ ///
+ public ConcurrentQueue FirstQueue;
+ ///
+ /// 空闲级队列
+ ///
+ public ConcurrentQueue IdleQueue;
+ ///
+ /// 任务队列
+ ///
+ public List Queue;
+
+ ///
+ /// 通过提供的选项实例化任务调度程序
+ ///
+ /// 配置参数
+ public ConcurrentQueueTask(TaskSchedulerOption option = null) {
+ this.Option = option ?? new TaskSchedulerOption();
+ if(this.Option.EnableSystemQueue) {
+ this.SystemQueue = GetNewQueue();
+ }
+ if(this.Option.EnableFirstQueue) {
+ this.FirstQueue = GetNewQueue();
+ }
+ if(this.Option.EnableIdleQueue) {
+ this.IdleQueue = GetNewQueue();
+ }
+ this.Queue = new List();
+ if(this.Option.QueueDefine.IsNotNullOrEmpty()) {
+ var spc = ",;,;";
+ var q = this.Option.QueueDefine.Split(spc.ToCharArray());
+ foreach(var i in q) {
+ if(int.TryParse(i,out var number)) {
+ this.Queue.Add(new QueueUnit {
+ Queue = GetNewQueue(),
+ Weight = number,
+ });
+ } else {
+ throw new Exception($"QueueDefine值{this.Option.QueueDefine}定义错误");
+ }
+ }
+ }
+ }
+ ///
+ /// 获取一个新的安全队列
+ ///
+ protected ConcurrentQueue GetNewQueue() {
+ return new ConcurrentQueue();
+ }
+
+ public void AddFirstQueue(ITaskUnit task) {
+ if(!this.Option.EnableFirstQueue) {
+ throw new NotSupportedException();
+ }
+ this.FirstQueue.Enqueue(task);
+ }
+
+ public void AddIdleQueue(ITaskUnit task) {
+ if(!this.Option.EnableIdleQueue) {
+ throw new NotSupportedException();
+ }
+ this.IdleQueue.Enqueue(task);
+ }
+
+ public void AddSystemQueue(ITaskUnit task) {
+ if(!this.Option.EnableSystemQueue) {
+ throw new NotSupportedException();
+ }
+ this.SystemQueue.Enqueue(task);
+ }
+
+ public void AddQueue(int weight,ITaskUnit task) {
+ var c = this.Queue.Count();
+ if(c == 0) {
+ throw new NotSupportedException();
+ }
+ var queue = this.Queue.Where(m => m.Weight == weight);
+ if(!queue.Any()) {
+ throw new Exception("添加到任务队列提供的weight值错误。必须提供TaskSchedulerOption.QueueDefine中定义的值。");
+ }
+ queue.First().Queue.Enqueue(task);
+ }
+
+ private bool _isRun = false;
+ private static object _lockObj = new object();
+ ///
+ /// 开始执行调度任务
+ ///
+ public void Start() {
+ if(!_isRun) {
+ lock(_lockObj) {
+ if(!_isRun) {
+ _isRun = true;
+ var task = GetNextTask();
+ while(task != null) {
+ object result = null;
+ Exception exp = null;
+ try {
+ result = task.Run();
+ } catch(Exception ex) {
+ exp = ex;
+ }
+ task.TaskCallback(result,exp);
+ Thread.Sleep(0);
+ task = GetNextTask();
+ }
+ _isRun = false;
+ }
+ }
+ }
+ }
+
+ protected ITaskUnit GetNextTask() {
+ ITaskUnit t;
+ if(this.Option.EnableSystemQueue) {
+ if(this.SystemQueue.TryDequeue(out t)) {
+ return t;
+ }
+ }
+ if(this.Option.EnableFirstQueue) {
+ if(this.FirstQueue.TryDequeue(out t)) {
+ return t;
+ }
+ }
+ t = GetTaskFromQueue();
+ if(t != null) {
+ return t;
+ }
+ if(this.Option.EnableIdleQueue) {
+ if(this.IdleQueue.TryDequeue(out t)) {
+ return t;
+ }
+ }
+ return null;
+ }
+
+ private ITaskUnit GetTaskFromQueue() {
+ var ql = this.Queue.Where(m => m.HasTask);
+ var weight = ql.Sum(m => m.Weight);
+ var point = GetRander(weight);
+ foreach(var q in ql) {
+ if(point < q.Weight) {
+ if(q.Queue.TryDequeue(out var tu)) {
+ return tu;
+ }
+ }
+ point -= q.Weight;
+ }
+ return null;
+ }
+
+ private static Random RandomNum = new Random();
+
+ protected int GetRander(int max) {
+ return RandomNum.Next(0,max);
+ }
+ }
+}
diff --git a/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj b/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj
index 65e7db3..90de67f 100644
--- a/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj
+++ b/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj
@@ -9,4 +9,7 @@
多任务、多优先级任务调度组件
Falcon
+
+
+
diff --git a/Falcon.TaskScheduling/ITaskScheduler.cs b/Falcon.TaskScheduling/ITaskScheduler.cs
new file mode 100644
index 0000000..2973c30
--- /dev/null
+++ b/Falcon.TaskScheduling/ITaskScheduler.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Falcon.TaskScheduling
+{
+ ///
+ /// 任务调度器
+ ///
+ public interface ITaskScheduler
+ {
+ ///
+ /// 将任务加入普通任务调度队列
+ ///
+ /// 队列序号。序号从0开始到定义的队列总数-1
+ /// 要加入的任务
+ void AddQueue(int weight,ITaskUnit task);
+ ///
+ /// 将任务加入系统级队列。如果不允许则抛出异常
+ ///
+ /// 要加入的任务
+ void AddSystemQueue(ITaskUnit task);
+ ///
+ /// 将任务加入优先级队列。如果不允许则抛出异常
+ ///
+ /// 要加入的任务
+ void AddFirstQueue(ITaskUnit task);
+ ///
+ /// 将任务加入空闲队列。如果不允许则抛出异常
+ ///
+ /// 要加入的任务
+ void AddIdleQueue(ITaskUnit task);
+ }
+}
diff --git a/Falcon.TaskScheduling/ITaskUnit.cs b/Falcon.TaskScheduling/ITaskUnit.cs
new file mode 100644
index 0000000..eab70b6
--- /dev/null
+++ b/Falcon.TaskScheduling/ITaskUnit.cs
@@ -0,0 +1,24 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Falcon.TaskScheduling
+{
+ ///
+ /// 任务执行单元
+ ///
+ public interface ITaskUnit
+ {
+ ///
+ /// 执行任务
+ ///
+ /// 任务返回结果
+ object Run();
+ ///
+ /// 任务执行结束回调
+ ///
+ /// 返回结果
+ /// 任务引发的异常
+ void TaskCallback(object result,Exception ex);
+ }
+}
diff --git a/Falcon.TaskScheduling/NotSupportException.cs b/Falcon.TaskScheduling/NotSupportException.cs
new file mode 100644
index 0000000..d6f7512
--- /dev/null
+++ b/Falcon.TaskScheduling/NotSupportException.cs
@@ -0,0 +1,15 @@
+using System;
+
+namespace Falcon.TaskScheduling
+{
+ ///
+ /// 配置参数不支持该队列
+ ///
+ public class NotSupportException:Exception
+ {
+ ///
+ /// 实例化一个不支持该队列的异常
+ ///
+ public NotSupportException() : base("配置参数不支持该队列") { }
+ }
+}
diff --git a/Falcon.TaskScheduling/QueueUnit.cs b/Falcon.TaskScheduling/QueueUnit.cs
new file mode 100644
index 0000000..7407c0a
--- /dev/null
+++ b/Falcon.TaskScheduling/QueueUnit.cs
@@ -0,0 +1,27 @@
+using System.Collections.Concurrent;
+
+namespace Falcon.TaskScheduling
+{
+ ///
+ /// 队列单元
+ ///
+ public class QueueUnit
+ {
+ ///
+ /// 队列
+ ///
+ public ConcurrentQueue Queue { get; set; } = new ConcurrentQueue();
+ ///
+ /// 队列权重编号
+ ///
+ public int Weight { get; set; }
+ ///
+ /// 获取队列是否有任务
+ ///
+ public bool HasTask {
+ get {
+ return Queue.Count > 0;
+ }
+ }
+ }
+}
diff --git a/Falcon.TaskScheduling/TaskSchedulerOption.cs b/Falcon.TaskScheduling/TaskSchedulerOption.cs
new file mode 100644
index 0000000..285b151
--- /dev/null
+++ b/Falcon.TaskScheduling/TaskSchedulerOption.cs
@@ -0,0 +1,26 @@
+namespace Falcon.TaskScheduling
+{
+ ///
+ /// 任务调度器参数
+ ///
+ public class TaskSchedulerOption
+ {
+ ///
+ /// 允许系统级任务队列,优先级最高,只要有任务优先调度。
+ ///
+ public bool EnableSystemQueue { get; set; } = true;
+ ///
+ /// 允许优先任务队列,在系统级队列无任务执行后进行调度。
+ ///
+ public bool EnableFirstQueue { get; set; } = true;
+ ///
+ /// 允许空闲任务队列,其他队列无任务时调用
+ ///
+ public bool EnableIdleQueue { get; set; } = true;
+ ///
+ /// 普通优先级任务队列定义。按照提供的整数(调度概率)序列实现队列,数字越大队列调度概率越高
+ ///
+ /// 例如定义10,5,表示15次调用中第一个队列调度10次,第二个队列5次。
+ public string QueueDefine { get; set; } = "2,1";
+ }
+}
diff --git a/Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs b/Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs
new file mode 100644
index 0000000..43d5a4a
--- /dev/null
+++ b/Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs
@@ -0,0 +1,132 @@
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Falcon.TaskScheduling;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Falcon.TaskSchedulingTests;
+
+namespace Falcon.TaskScheduling.Tests
+{
+ [TestClass()]
+ public class ConcurrentQueueTaskTests
+ {
+ [TestMethod()]
+ public void AddFirstQueueTest() {
+ ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableFirstQueue = true,
+ });
+ ts.AddFirstQueue(new TaskUnit());
+ Assert.IsTrue(ts.FirstQueue.Count == 1);
+ ts.AddFirstQueue(new TaskUnit());
+ Assert.IsTrue(ts.FirstQueue.Count == 2);
+
+ ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableFirstQueue = false,
+ });
+ try {
+ ts.AddFirstQueue(new TaskUnit());
+ } catch(NotSupportedException ex) {
+
+ }
+ Assert.IsNull(ts.FirstQueue);
+ }
+
+ [TestMethod()]
+ public void AddIdleQueueTest() {
+ ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableIdleQueue = true,
+ });
+ ts.AddIdleQueue(new TaskUnit());
+ Assert.IsTrue(ts.IdleQueue.Count == 1);
+ ts.AddIdleQueue(new TaskUnit());
+ Assert.IsTrue(ts.IdleQueue.Count == 2);
+
+ ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableIdleQueue = false,
+ });
+ try {
+ ts.AddIdleQueue(new TaskUnit());
+ } catch(NotSupportedException ex) {
+
+ }
+ Assert.IsNull(ts.IdleQueue);
+ }
+
+ [TestMethod()]
+ public void AddSystemQueueTest() {
+ ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableSystemQueue = true,
+ });
+ ts.AddSystemQueue(new TaskUnit());
+ Assert.IsTrue(ts.SystemQueue.Count == 1);
+ ts.AddSystemQueue(new TaskUnit());
+ Assert.IsTrue(ts.SystemQueue.Count == 2);
+
+ ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableSystemQueue = false,
+ });
+ try {
+ ts.AddSystemQueue(new TaskUnit());
+ } catch(NotSupportedException ex) {
+
+ }
+ Assert.IsNull(ts.SystemQueue);
+ }
+
+ [TestMethod()]
+ public void AddQueueTest() {
+ ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ QueueDefine = "50,40",
+ });
+ Assert.IsTrue(ts.Queue.Count == 2);
+ Assert.IsTrue(ts.Queue.ToArray()[0].Queue.Count == 0);
+ Assert.IsTrue(ts.Queue.ToArray()[0].Weight == 50);
+ Assert.IsTrue(ts.Queue.ToArray()[1].Queue.Count == 0);
+ Assert.IsTrue(ts.Queue.ToArray()[1].Weight == 40);
+ }
+
+ [TestMethod()]
+ public void StartTest() {
+ ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption {
+ EnableFirstQueue = true,
+ EnableIdleQueue = true,
+ EnableSystemQueue = true,
+ QueueDefine = "50,40",
+ });
+ var tul = new List();
+ tul.Add(new TaskUnit("1",TaskType.system));
+ tul.Add(new TaskUnit("2",TaskType.system));
+ tul.Add(new TaskUnit("3",TaskType.idel));
+ tul.Add(new TaskUnit("4",TaskType.first));
+ tul.Add(new TaskUnit("5",TaskType.queue));
+ tul.Add(new TaskUnit("6",TaskType.queue));
+
+ foreach(var t in tul) {
+ switch(t.Type) {
+ case TaskType.system:
+ ts.AddSystemQueue(t);
+ break;
+ case TaskType.first:
+ ts.AddFirstQueue(t);
+ break;
+ case TaskType.queue:
+ ts.AddQueue(50,t);
+ break;
+ case TaskType.idel:
+ ts.AddIdleQueue(t);
+ break;
+ default:
+ break;
+ }
+ }
+ ts.Start();
+ Assert.IsTrue(tul.Count == TaskUnit.ResultList.Count);
+ string wl = "";
+ foreach(var tu in TaskUnit.ResultList) {
+ Assert.IsTrue(tu.State == tu.Result);
+ wl += tu.Result;
+ }
+ Assert.IsTrue(wl == "124563");
+ }
+ }
+}
\ No newline at end of file
diff --git a/Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj b/Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj
new file mode 100644
index 0000000..ec54f08
--- /dev/null
+++ b/Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj
@@ -0,0 +1,22 @@
+
+
+
+ netcoreapp3.1
+
+ false
+
+ 8.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Falcon.TaskSchedulingTests/TaskUnit.cs b/Falcon.TaskSchedulingTests/TaskUnit.cs
new file mode 100644
index 0000000..d9e9c92
--- /dev/null
+++ b/Falcon.TaskSchedulingTests/TaskUnit.cs
@@ -0,0 +1,43 @@
+using System;
+using System.Collections.Generic;
+using Falcon.TaskScheduling;
+
+namespace Falcon.TaskSchedulingTests
+{
+ public class TaskUnit:ITaskUnit
+ {
+ public string State { get; set; }
+ public string Result { get; set; }
+ public TaskType Type { get; set; }
+
+ public static List ResultList { get; set; } = new List();
+
+ public TaskUnit() {
+ this.State = "";
+ }
+ public TaskUnit(string state,TaskType type) {
+ this.State = state;
+ this.Type = type;
+ }
+ public object Run() {
+ return this.State as object;
+ }
+
+ public void TaskCallback(object result,Exception ex) {
+ if(result is string str) {
+ this.Result = str;
+ } else {
+ this.Result = "";
+ }
+ ResultList.Add(this);
+ }
+ }
+
+ public enum TaskType
+ {
+ system,
+ first,
+ queue,
+ idel,
+ }
+}