diff --git a/Falcon.TaskScheduling.sln b/Falcon.TaskScheduling.sln index 8b813e1..0b10b80 100644 --- a/Falcon.TaskScheduling.sln +++ b/Falcon.TaskScheduling.sln @@ -3,7 +3,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 VisualStudioVersion = 16.0.29613.14 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Falcon.TaskScheduling", "Falcon.TaskScheduling\Falcon.TaskScheduling.csproj", "{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Falcon.TaskScheduling", "Falcon.TaskScheduling\Falcon.TaskScheduling.csproj", "{EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Falcon.TaskSchedulingTests", "Falcon.TaskSchedulingTests\Falcon.TaskSchedulingTests.csproj", "{2FD44ACF-CEE9-4A50-870E-28F85865066F}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -15,6 +17,10 @@ Global {EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}.Debug|Any CPU.Build.0 = Debug|Any CPU {EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}.Release|Any CPU.ActiveCfg = Release|Any CPU {EF69D4E0-A9BD-4D44-B5BA-758D2A7668DF}.Release|Any CPU.Build.0 = Release|Any CPU + {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2FD44ACF-CEE9-4A50-870E-28F85865066F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Falcon.TaskScheduling/ConcurrentQueueTask.cs b/Falcon.TaskScheduling/ConcurrentQueueTask.cs new file mode 100644 index 0000000..1710874 --- /dev/null +++ b/Falcon.TaskScheduling/ConcurrentQueueTask.cs @@ -0,0 +1,182 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using Falcon.Extend; +using System.Linq; +using System.Threading; + +namespace Falcon.TaskScheduling +{ + /// + /// 安全队列实现 + /// + public class ConcurrentQueueTask:ITaskScheduler + { + /// + /// 选项 + /// + public TaskSchedulerOption Option { get; set; } + /// + /// 系统级队列 + /// + public ConcurrentQueue SystemQueue; + /// + /// 优先级队列 + /// + public ConcurrentQueue FirstQueue; + /// + /// 空闲级队列 + /// + public ConcurrentQueue IdleQueue; + /// + /// 任务队列 + /// + public List Queue; + + /// + /// 通过提供的选项实例化任务调度程序 + /// + /// 配置参数 + public ConcurrentQueueTask(TaskSchedulerOption option = null) { + this.Option = option ?? new TaskSchedulerOption(); + if(this.Option.EnableSystemQueue) { + this.SystemQueue = GetNewQueue(); + } + if(this.Option.EnableFirstQueue) { + this.FirstQueue = GetNewQueue(); + } + if(this.Option.EnableIdleQueue) { + this.IdleQueue = GetNewQueue(); + } + this.Queue = new List(); + if(this.Option.QueueDefine.IsNotNullOrEmpty()) { + var spc = ",;,;"; + var q = this.Option.QueueDefine.Split(spc.ToCharArray()); + foreach(var i in q) { + if(int.TryParse(i,out var number)) { + this.Queue.Add(new QueueUnit { + Queue = GetNewQueue(), + Weight = number, + }); + } else { + throw new Exception($"QueueDefine值{this.Option.QueueDefine}定义错误"); + } + } + } + } + /// + /// 获取一个新的安全队列 + /// + protected ConcurrentQueue GetNewQueue() { + return new ConcurrentQueue(); + } + + public void AddFirstQueue(ITaskUnit task) { + if(!this.Option.EnableFirstQueue) { + throw new NotSupportedException(); + } + this.FirstQueue.Enqueue(task); + } + + public void AddIdleQueue(ITaskUnit task) { + if(!this.Option.EnableIdleQueue) { + throw new NotSupportedException(); + } + this.IdleQueue.Enqueue(task); + } + + public void AddSystemQueue(ITaskUnit task) { + if(!this.Option.EnableSystemQueue) { + throw new NotSupportedException(); + } + this.SystemQueue.Enqueue(task); + } + + public void AddQueue(int weight,ITaskUnit task) { + var c = this.Queue.Count(); + if(c == 0) { + throw new NotSupportedException(); + } + var queue = this.Queue.Where(m => m.Weight == weight); + if(!queue.Any()) { + throw new Exception("添加到任务队列提供的weight值错误。必须提供TaskSchedulerOption.QueueDefine中定义的值。"); + } + queue.First().Queue.Enqueue(task); + } + + private bool _isRun = false; + private static object _lockObj = new object(); + /// + /// 开始执行调度任务 + /// + public void Start() { + if(!_isRun) { + lock(_lockObj) { + if(!_isRun) { + _isRun = true; + var task = GetNextTask(); + while(task != null) { + object result = null; + Exception exp = null; + try { + result = task.Run(); + } catch(Exception ex) { + exp = ex; + } + task.TaskCallback(result,exp); + Thread.Sleep(0); + task = GetNextTask(); + } + _isRun = false; + } + } + } + } + + protected ITaskUnit GetNextTask() { + ITaskUnit t; + if(this.Option.EnableSystemQueue) { + if(this.SystemQueue.TryDequeue(out t)) { + return t; + } + } + if(this.Option.EnableFirstQueue) { + if(this.FirstQueue.TryDequeue(out t)) { + return t; + } + } + t = GetTaskFromQueue(); + if(t != null) { + return t; + } + if(this.Option.EnableIdleQueue) { + if(this.IdleQueue.TryDequeue(out t)) { + return t; + } + } + return null; + } + + private ITaskUnit GetTaskFromQueue() { + var ql = this.Queue.Where(m => m.HasTask); + var weight = ql.Sum(m => m.Weight); + var point = GetRander(weight); + foreach(var q in ql) { + if(point < q.Weight) { + if(q.Queue.TryDequeue(out var tu)) { + return tu; + } + } + point -= q.Weight; + } + return null; + } + + private static Random RandomNum = new Random(); + + protected int GetRander(int max) { + return RandomNum.Next(0,max); + } + } +} diff --git a/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj b/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj index 65e7db3..90de67f 100644 --- a/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj +++ b/Falcon.TaskScheduling/Falcon.TaskScheduling.csproj @@ -9,4 +9,7 @@ 多任务、多优先级任务调度组件 Falcon + + + diff --git a/Falcon.TaskScheduling/ITaskScheduler.cs b/Falcon.TaskScheduling/ITaskScheduler.cs new file mode 100644 index 0000000..2973c30 --- /dev/null +++ b/Falcon.TaskScheduling/ITaskScheduler.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Falcon.TaskScheduling +{ + /// + /// 任务调度器 + /// + public interface ITaskScheduler + { + /// + /// 将任务加入普通任务调度队列 + /// + /// 队列序号。序号从0开始到定义的队列总数-1 + /// 要加入的任务 + void AddQueue(int weight,ITaskUnit task); + /// + /// 将任务加入系统级队列。如果不允许则抛出异常 + /// + /// 要加入的任务 + void AddSystemQueue(ITaskUnit task); + /// + /// 将任务加入优先级队列。如果不允许则抛出异常 + /// + /// 要加入的任务 + void AddFirstQueue(ITaskUnit task); + /// + /// 将任务加入空闲队列。如果不允许则抛出异常 + /// + /// 要加入的任务 + void AddIdleQueue(ITaskUnit task); + } +} diff --git a/Falcon.TaskScheduling/ITaskUnit.cs b/Falcon.TaskScheduling/ITaskUnit.cs new file mode 100644 index 0000000..eab70b6 --- /dev/null +++ b/Falcon.TaskScheduling/ITaskUnit.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Falcon.TaskScheduling +{ + /// + /// 任务执行单元 + /// + public interface ITaskUnit + { + /// + /// 执行任务 + /// + /// 任务返回结果 + object Run(); + /// + /// 任务执行结束回调 + /// + /// 返回结果 + /// 任务引发的异常 + void TaskCallback(object result,Exception ex); + } +} diff --git a/Falcon.TaskScheduling/NotSupportException.cs b/Falcon.TaskScheduling/NotSupportException.cs new file mode 100644 index 0000000..d6f7512 --- /dev/null +++ b/Falcon.TaskScheduling/NotSupportException.cs @@ -0,0 +1,15 @@ +using System; + +namespace Falcon.TaskScheduling +{ + /// + /// 配置参数不支持该队列 + /// + public class NotSupportException:Exception + { + /// + /// 实例化一个不支持该队列的异常 + /// + public NotSupportException() : base("配置参数不支持该队列") { } + } +} diff --git a/Falcon.TaskScheduling/QueueUnit.cs b/Falcon.TaskScheduling/QueueUnit.cs new file mode 100644 index 0000000..7407c0a --- /dev/null +++ b/Falcon.TaskScheduling/QueueUnit.cs @@ -0,0 +1,27 @@ +using System.Collections.Concurrent; + +namespace Falcon.TaskScheduling +{ + /// + /// 队列单元 + /// + public class QueueUnit + { + /// + /// 队列 + /// + public ConcurrentQueue Queue { get; set; } = new ConcurrentQueue(); + /// + /// 队列权重编号 + /// + public int Weight { get; set; } + /// + /// 获取队列是否有任务 + /// + public bool HasTask { + get { + return Queue.Count > 0; + } + } + } +} diff --git a/Falcon.TaskScheduling/TaskSchedulerOption.cs b/Falcon.TaskScheduling/TaskSchedulerOption.cs new file mode 100644 index 0000000..285b151 --- /dev/null +++ b/Falcon.TaskScheduling/TaskSchedulerOption.cs @@ -0,0 +1,26 @@ +namespace Falcon.TaskScheduling +{ + /// + /// 任务调度器参数 + /// + public class TaskSchedulerOption + { + /// + /// 允许系统级任务队列,优先级最高,只要有任务优先调度。 + /// + public bool EnableSystemQueue { get; set; } = true; + /// + /// 允许优先任务队列,在系统级队列无任务执行后进行调度。 + /// + public bool EnableFirstQueue { get; set; } = true; + /// + /// 允许空闲任务队列,其他队列无任务时调用 + /// + public bool EnableIdleQueue { get; set; } = true; + /// + /// 普通优先级任务队列定义。按照提供的整数(调度概率)序列实现队列,数字越大队列调度概率越高 + /// + /// 例如定义10,5,表示15次调用中第一个队列调度10次,第二个队列5次。 + public string QueueDefine { get; set; } = "2,1"; + } +} diff --git a/Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs b/Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs new file mode 100644 index 0000000..43d5a4a --- /dev/null +++ b/Falcon.TaskSchedulingTests/ConcurrentQueueTaskTests.cs @@ -0,0 +1,132 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Falcon.TaskScheduling; +using System; +using System.Collections.Generic; +using System.Text; +using Falcon.TaskSchedulingTests; + +namespace Falcon.TaskScheduling.Tests +{ + [TestClass()] + public class ConcurrentQueueTaskTests + { + [TestMethod()] + public void AddFirstQueueTest() { + ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableFirstQueue = true, + }); + ts.AddFirstQueue(new TaskUnit()); + Assert.IsTrue(ts.FirstQueue.Count == 1); + ts.AddFirstQueue(new TaskUnit()); + Assert.IsTrue(ts.FirstQueue.Count == 2); + + ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableFirstQueue = false, + }); + try { + ts.AddFirstQueue(new TaskUnit()); + } catch(NotSupportedException ex) { + + } + Assert.IsNull(ts.FirstQueue); + } + + [TestMethod()] + public void AddIdleQueueTest() { + ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableIdleQueue = true, + }); + ts.AddIdleQueue(new TaskUnit()); + Assert.IsTrue(ts.IdleQueue.Count == 1); + ts.AddIdleQueue(new TaskUnit()); + Assert.IsTrue(ts.IdleQueue.Count == 2); + + ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableIdleQueue = false, + }); + try { + ts.AddIdleQueue(new TaskUnit()); + } catch(NotSupportedException ex) { + + } + Assert.IsNull(ts.IdleQueue); + } + + [TestMethod()] + public void AddSystemQueueTest() { + ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableSystemQueue = true, + }); + ts.AddSystemQueue(new TaskUnit()); + Assert.IsTrue(ts.SystemQueue.Count == 1); + ts.AddSystemQueue(new TaskUnit()); + Assert.IsTrue(ts.SystemQueue.Count == 2); + + ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableSystemQueue = false, + }); + try { + ts.AddSystemQueue(new TaskUnit()); + } catch(NotSupportedException ex) { + + } + Assert.IsNull(ts.SystemQueue); + } + + [TestMethod()] + public void AddQueueTest() { + ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption { + QueueDefine = "50,40", + }); + Assert.IsTrue(ts.Queue.Count == 2); + Assert.IsTrue(ts.Queue.ToArray()[0].Queue.Count == 0); + Assert.IsTrue(ts.Queue.ToArray()[0].Weight == 50); + Assert.IsTrue(ts.Queue.ToArray()[1].Queue.Count == 0); + Assert.IsTrue(ts.Queue.ToArray()[1].Weight == 40); + } + + [TestMethod()] + public void StartTest() { + ConcurrentQueueTask ts = new ConcurrentQueueTask(new TaskSchedulerOption { + EnableFirstQueue = true, + EnableIdleQueue = true, + EnableSystemQueue = true, + QueueDefine = "50,40", + }); + var tul = new List(); + tul.Add(new TaskUnit("1",TaskType.system)); + tul.Add(new TaskUnit("2",TaskType.system)); + tul.Add(new TaskUnit("3",TaskType.idel)); + tul.Add(new TaskUnit("4",TaskType.first)); + tul.Add(new TaskUnit("5",TaskType.queue)); + tul.Add(new TaskUnit("6",TaskType.queue)); + + foreach(var t in tul) { + switch(t.Type) { + case TaskType.system: + ts.AddSystemQueue(t); + break; + case TaskType.first: + ts.AddFirstQueue(t); + break; + case TaskType.queue: + ts.AddQueue(50,t); + break; + case TaskType.idel: + ts.AddIdleQueue(t); + break; + default: + break; + } + } + ts.Start(); + Assert.IsTrue(tul.Count == TaskUnit.ResultList.Count); + string wl = ""; + foreach(var tu in TaskUnit.ResultList) { + Assert.IsTrue(tu.State == tu.Result); + wl += tu.Result; + } + Assert.IsTrue(wl == "124563"); + } + } +} \ No newline at end of file diff --git a/Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj b/Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj new file mode 100644 index 0000000..ec54f08 --- /dev/null +++ b/Falcon.TaskSchedulingTests/Falcon.TaskSchedulingTests.csproj @@ -0,0 +1,22 @@ + + + + netcoreapp3.1 + + false + + 8.0 + + + + + + + + + + + + + + diff --git a/Falcon.TaskSchedulingTests/TaskUnit.cs b/Falcon.TaskSchedulingTests/TaskUnit.cs new file mode 100644 index 0000000..d9e9c92 --- /dev/null +++ b/Falcon.TaskSchedulingTests/TaskUnit.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections.Generic; +using Falcon.TaskScheduling; + +namespace Falcon.TaskSchedulingTests +{ + public class TaskUnit:ITaskUnit + { + public string State { get; set; } + public string Result { get; set; } + public TaskType Type { get; set; } + + public static List ResultList { get; set; } = new List(); + + public TaskUnit() { + this.State = ""; + } + public TaskUnit(string state,TaskType type) { + this.State = state; + this.Type = type; + } + public object Run() { + return this.State as object; + } + + public void TaskCallback(object result,Exception ex) { + if(result is string str) { + this.Result = str; + } else { + this.Result = ""; + } + ResultList.Add(this); + } + } + + public enum TaskType + { + system, + first, + queue, + idel, + } +}