using AutoMapper; using Edge.Core.Configuration; using Edge.Core.Database.Configuration.Models; using Edge.Core.Processor.Communicator; using Edge.Core.Processor.Dispatcher.Attributes; using Edge.Core.UniversalApi; using Edge.Core.UniversalApi.CommunicationProvider; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Newtonsoft.Json.Schema; using Newtonsoft.Json.Schema.Generation; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Runtime.Loader; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using System.Text.RegularExpressions; using System.Threading.Tasks; namespace Edge.Core.Processor.Dispatcher { [UniversalApi(Name = DefaultDispatcher.OnProcessorsLifeCycleChangedEventApiName, EventDataType = typeof(OnProcessorsLifeCycleChangedEventArg), Description = "When life cycle of processors are changed, an event will fired with details for each processor.")] [UniversalApi(Name = GenericAlarm.UniversalApiEventName, EventDataType = typeof(object), Description = "Fire GenericAlarm which mostly used in Alarm UI Bar for attracting users.")] [MetaPartsDescriptor("DefaultDispatcher", "Processors' life cycle manager.", new[] { "Managment" }, IsSystemInternalComponent = true)] public partial class DefaultDispatcher : IAppProcessor { public const string OnProcessorsLifeCycleChangedEventApiName = "OnProcessorsLifeCycleChanged"; private List historyProcessorsLifeCycleChangedEvents = new List(); public event EventHandler OnProcessorInstantiated; /// /// latest instantiated processor result. /// private IEnumerable currentProcessorInstantiatedOperatingResults; //public static EventHandler OnRequestDispatch; protected IServiceProvider services; protected static ILogger mainLogger = NullLogger.Instance; public string MetaConfigName { get; set; } = "ProcessorsDispatcher"; public static string DefaultClientAcceptLanguage = "zh-cn"; static DefaultDispatcher() { // replace with your license key, https://www.newtonsoft.com/jsonschema string licenseKey = "4278-fsXeHk11Z1h23Ki2z21y8tDMv43+eMW7Gcnv6caaGWz6w8Q4xrsg/Ow/moZc6GrkgjCaiq33yzr/meweRwNrPHsfqTc1rgpJu385YKKqMbJ5UKUDV1csm5ZwpMBIc1JJDbQwVZuDqUyOFqLZ31uk+10j9i8uMTPGEzLI729edmx7IklkIjo0Mjc4LCJFeHBpcnlEYXRlIjoiMjAyMS0wNy0wNFQwNjoxMjozMC43Mzc1NTE1WiIsIlR5cGUiOiJKc29uU2NoZW1hQnVzaW5lc3MifQ=="; License.RegisterLicense(licenseKey); //AppDomain.CurrentDomain.TypeResolve += (s, args) => //{ // var type = ObjectInstanceCreator.DefaultSearchLocalAssemblyFileTypeResolver(args.Name); // return type?.Assembly; //}; //AppDomain.CurrentDomain.AssemblyResolve += (s, args) => //{ // return ObjectInstanceCreator.DefaultSearchLocalAssemblyFileAssemblyResolver(args.Name); //}; } public DefaultDispatcher(IServiceProvider services) { this.services = services; var loggerFactory = services.GetRequiredService(); mainLogger = loggerFactory.CreateLogger("Main"); ObjectInstanceCreator.mainLogger = mainLogger; mainLogger.LogInformation("AppDomain.CurrentDomain.CurrentDomainAssemblies: " + ObjectInstanceCreator.CurrentDomainAssemblies?.Select(ass => ass.FullName + " " + (!ass.IsDynamic ? ass.Location ?? "" : "")).OrderBy(o => o) .Aggregate(Environment.NewLine, (acc, n) => acc + n + Environment.NewLine) ?? ""); //route this special event into this processor rather than from different processors to form an unified api entrance. this.services.GetRequiredService().OnUniversalGenericAlarmEventFired += async (s, a) => { await this.services.GetRequiredService().FireEvent(this, GenericAlarm.UniversalApiEventName, new { Title = a.GenericAlarm.Title.LocalizedContent(DefaultClientAcceptLanguage), Category = a.GenericAlarm.Category?.LocalizedContent(DefaultClientAcceptLanguage), SubCategory = a.GenericAlarm.SubCategory?.LocalizedContent(DefaultClientAcceptLanguage), Detail = a.GenericAlarm.Detail?.LocalizedContent(DefaultClientAcceptLanguage), a.GenericAlarm.Severity, a.PersistId }); }; } /// /// /// /// /// public virtual async Task> CreateProcessorsAsync(string reason, bool enableSafeBoot = false) { var processorLoader = this.services.GetRequiredService(); var defaultDispatcherProcessorInstanceOperatingResult = new ProcessorInstanceOperatingResult() { Succeed = true, ProcessorInstance = this, //for this special AppProcessor as it's included in Edge.Core, so both version are the same HostVersion = Assembly.GetAssembly(this.GetType()).GetName().Version.ToString(), CoreVersion = Assembly.GetAssembly(this.GetType()).GetName().Version.ToString(), }; if (enableSafeBoot) this.currentProcessorInstantiatedOperatingResults = new[] { defaultDispatcherProcessorInstanceOperatingResult }; else this.currentProcessorInstantiatedOperatingResults = new[] { defaultDispatcherProcessorInstanceOperatingResult }.Concat(processorLoader.CreateAll()).ToList(); this.OnProcessorInstantiated?.Invoke(this, new OnProcessorsInstantiatedEventArg(this.currentProcessorInstantiatedOperatingResults)); var createEvtArg = new OnProcessorsLifeCycleChangedEventArg( this.currentProcessorInstantiatedOperatingResults.Select(i => ProcessorLifeCycleOperationResult.From(i)).ToList(), "ProcessorsInstantiate", reason); this.historyProcessorsLifeCycleChangedEvents.Add(createEvtArg); return this.currentProcessorInstantiatedOperatingResults; } public virtual async Task> StartProcessorsAsync( IEnumerable processorInstances, string reason) { //App will call Init(...) before Start() var initOrStartResults = new List(); mainLogger.LogInformation("==============Start Processors (total: " + processorInstances.Count() + ")=============="); Console.WriteLine(); Console.WriteLine("==============Start Processors (total: " + processorInstances.Count() + ")=============="); var pendingForStartInstances = processorInstances.ToList(); // call App.Init(...), and only succeed ones will call its Start() later. processorInstances.OfType().ToList().ForEach(app => { try { app.Init(processorInstances); } catch (Exception eee) { //remove it for avoid call Start() pendingForStartInstances.Remove(app); initOrStartResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, ProcessorInstance = app, Succeed = false, FailedReason = "Call App.Init(...) exceptioned: " + eee.ToString(), //Description = $"Ordinary start a processor but failed with an unexpected fatal error", }); mainLogger.LogError("Initing [" + (app.MetaConfigName ?? "") + "] failed and will skip its Start: " + eee.ToString()); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine("Initing [" + (app.MetaConfigName ?? "") + "] failed: " + Environment.NewLine + " " + eee.ToString().Substring(0, 70) + "..."); Console.ResetColor(); } }); for (int i = 0; i < pendingForStartInstances.Count(); i++) { var p = pendingForStartInstances.ElementAt(i); try { mainLogger.LogInformation("Starting [" + (p.MetaConfigName ?? "") + "]: "); Console.WriteLine("Starting [" + (p.MetaConfigName ?? "") + "]: "); bool r = await p.Start(); if (r) { initOrStartResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, ProcessorInstance = p, Succeed = true, //Description = $"Ordinary start a processor and succeed.", }); mainLogger.LogInformation(" Started"); Console.WriteLine(" Started"); } else { initOrStartResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, ProcessorInstance = p, Succeed = false, //Description = $"Ordinary start a processor but failed with a graceful reason", }); mainLogger.LogInformation(" Failed for starting"); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(" !!!Failed for starting!!!"); Console.ResetColor(); } } catch (Exception eee) { initOrStartResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, ProcessorInstance = p, Succeed = false, FailedReason = eee.ToString(), //Description = $"Ordinary start a processor but failed with an unexpected fatal error", }); mainLogger.LogError(" - Failed for start: [" + p.MetaConfigName + "]" + Environment.NewLine + " " + eee.ToString()); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(" - Failed for start: [" + p.MetaConfigName + "]" + Environment.NewLine + " " + eee.ToString().Substring(0, 70) + "..."); Console.ResetColor(); } } mainLogger.LogInformation("==============Start Processors End==============\r\n"); Console.WriteLine("==============Start Processors End==============\r\n"); var startEvtArg = new OnProcessorsLifeCycleChangedEventArg( initOrStartResults.Select(i => ProcessorLifeCycleOperationResult.From(i)).ToList(), "ProcessorsStart", reason); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, DefaultDispatcher.OnProcessorsLifeCycleChangedEventApiName, startEvtArg); this.historyProcessorsLifeCycleChangedEvents.Add(startEvtArg); return initOrStartResults; } /// /// Call Stop() on target processors, and correlated life cycle operation results will be recorded. /// /// /// /// public virtual async Task> StopProcessorsAsync( IEnumerable processors, string reason) { if (processors == null || !processors.Any()) return Enumerable.Empty(); var stopResults = new List(); foreach (var p in processors) { try { Console.WriteLine(" stopping " + p.MetaConfigName + "..." + " by reason: " + (reason ?? "")); mainLogger.LogInformation(" stopping " + p.MetaConfigName + " by reason: " + (reason ?? "")); bool r = await p.Stop(); if (r) { stopResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, //ProcessorMetaConfigName = p.MetaConfigName, ProcessorInstance = p, Succeed = true, //Description = $"Ordinary stop a processor and succeed.", }); Console.WriteLine(" Stopped"); mainLogger.LogInformation(" Stopped"); } else { stopResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, //ProcessorMetaConfigName = p.MetaConfigName, ProcessorInstance = p, Succeed = false, //Description = $"Ordinary stop a processor but failed with a graceful reason", }); } } catch (Exception xxxx) { stopResults.Add(new ProcessorInstanceOperatingResult() { //WhatThisProcessorUsedFor = p.ProcessorDescriptor().DeviceHandlerOrApp.GetType().GetCustomAttributes()?.FirstOrDefault()?.Description, //ProcessorMetaConfigName = p.MetaConfigName, Succeed = false, FailedReason = xxxx.ToString(), //Description = $"Ordinary stop a processor but failed with an unexpected fatal error", }); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(" stopping " + p.MetaConfigName + " failed."); Console.ResetColor(); mainLogger.LogInformation(" stopping " + p.MetaConfigName + " failed: " + Environment.NewLine + xxxx); } } var stopEvtArg = new OnProcessorsLifeCycleChangedEventArg( stopResults.Select(i => ProcessorLifeCycleOperationResult.From(i)).ToList(), "ProcessorsStop", reason); this.historyProcessorsLifeCycleChangedEvents.Add(stopEvtArg); var universalApiHub = this.services.GetRequiredService(); await universalApiHub.FireEvent(this, DefaultDispatcher.OnProcessorsLifeCycleChangedEventApiName, stopEvtArg); return stopResults; } [UniversalApi(Description = "Get history datas for Processors' LifeCycle Changed Event")] public Task> GetHistoryProcessorsLifeCycleChangedEvents() { return Task.FromResult(this.historyProcessorsLifeCycleChangedEvents); } [UniversalApi] public async Task UpsertProcessorMetaConfigAsync(ProcessorMetaConfig input) { var accessor = this.services.GetService(); if (accessor == null) throw new NotSupportedException("Could not find ProcessorMetaConfigAccessor, may current ProcessorLoader does not support the MetaConfig Upsert???"); return await accessor.UpsertMetaConfigAsync(input); } [UniversalApi(Description = "Get all ProcessorMetaConfigs with endPointFullTypeStr qualified with input parameter, or leave null or empty to get all")] public async Task> GetProcessorMetaConfigAsync(string sourceEndpointFullTypeStr) { var accessor = this.services.GetService(); if (accessor == null) throw new NotSupportedException("Could not find ProcessorMetaConfigAccessor, may current ProcessorLoader does not support the MetaConfig Get???"); return await accessor.GetMetaConfigAsync(sourceEndpointFullTypeStr); } [UniversalApi(InputParametersExampleJson = "[121]", Description = "Remove a ProcessorMetaConfig and all its MetaPartsConfigs")] public async Task RemoveProcessorMetaConfigAsync(int metaConfigId) { var accessor = this.services.GetService(); if (accessor == null) throw new NotSupportedException("Could not find ProcessorMetaConfigAccessor, may current ProcessorLoader does not support the MetaConfig Remove???"); return await accessor.RemoveMetaConfigAsync(metaConfigId); } [UniversalApi(InputParametersExampleJson = "[\"wayne dart pump configuration 1\"]", Description = "Test by constructing a Processor instance on the air from specified metaConfig, start, test and then stop it, any error will treat as test failed.")] public async Task TestProcessorMetaConfigAsync(string configName, string[] parameters) { var processorLoader = this.services.GetRequiredService(); IProcessor p = null; try { p = processorLoader.Create(configName); if (p == null) throw new InvalidOperationException($"Test failed on constructing processor.");//: {p.MetaConfigName}"); var startResult = false; try { startResult = await p.Start(); if (!startResult) throw new InvalidOperationException($"Test failed on starting processor."); await p.Test(parameters); } catch (Exception eee) { throw;// new InvalidOperationException($"Test failed on pre-starting processor with error: {eee}"); } } finally { if (p != null) { var stopResult = false; try { stopResult = await p.Stop(); } catch (Exception eee) { throw;// new InvalidOperationException($"Test failed on pre-stop processor with error: {eee}"); } if (!stopResult) throw new InvalidOperationException($"Test failed on stopping processor."); } } return true; } /// /// /// /// /// the original ParametersJsonArrayStr if no resolve method defined in target meta part type, or updated value by call the resolve method [UniversalApi] public async Task ResolveProcessorMetaPartsConfigCompatibilityAsync(int processorMetaPartsConfigId) { var accessor = this.services.GetService(); if (accessor == null) throw new NotSupportedException("Could not find ProcessorMetaConfigAccessor, may current ProcessorLoader does not support the MetaConfig Get???"); var allMetaConfigs = await accessor.GetMetaConfigAsync(); var targetMetaPartsInfo = allMetaConfigs.SelectMany(mc => mc.Parts, (metaConfig, metaPartsConfig) => new { metaConfig, metaPartsConfig }) .FirstOrDefault(p => p.metaPartsConfig.Id == processorMetaPartsConfigId); if (targetMetaPartsInfo == null) throw new ArgumentException("Could not find processorMetaPartsConfig with id: " + processorMetaPartsConfigId); if (targetMetaPartsInfo.metaPartsConfig.TryCallMetaPartsConfigCompatibilityMethodAndUpdate()) { //targetMetaPartsInfo.metaConfig.Parts.First(p => p.Id == targetMetaPartsInfo.metaPartsConfig.Id) // .ParametersJsonArrayStr = reformatParamsStr; //await accessor.UpsertMetaConfigAsync(targetMetaPartsInfo.metaConfig); } return targetMetaPartsInfo.metaPartsConfig.ParametersJsonArrayStr; } [UniversalApi(Description = "Stop all exist non-system processors, and re-instantiate, re-start them again")] public async Task> ReloadProcessorsAsync(string reason = "") { try { mainLogger.LogInformation($"Stop Non-System Processors is on requesting by reason: {reason}"); Console.BackgroundColor = ConsoleColor.Yellow; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + $"=====>Stop All Processors is on requesting by reason: {reason}"); Console.ResetColor(); var stopResults = await this.StopProcessorsAsync( this.currentProcessorInstantiatedOperatingResults.Where(r => r.ProcessorInstance != null) .Select(r => r.ProcessorInstance).Except(new[] { this }), reason); mainLogger.LogInformation(" Stop Non-System Processors done successfully"); Console.BackgroundColor = ConsoleColor.Green; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + "<=====Stop All Processors done successfully"); Console.ResetColor(); //return stopResults; } catch (Exception exx) { mainLogger.LogError("******Stop Non-System Processors got internal error!\r\n" + exx.ToString()); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + "******Stop All Processors got internal error: " + exx.ToString().Substring(0, 70) + "..."); Console.ResetColor(); throw; } try { mainLogger.LogInformation($"Create All Processors is on requesting by reason: {reason}"); Console.BackgroundColor = ConsoleColor.Yellow; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + $"=====>Create All Processors is on requesting by reason: {reason}"); Console.ResetColor(); var createResults = await this.CreateProcessorsAsync(reason); mainLogger.LogInformation(" Create All Processors done successfully"); Console.BackgroundColor = ConsoleColor.Green; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + "<=====Create All Processors done successfully"); Console.ResetColor(); //return createResults; } catch (Exception exx) { mainLogger.LogError("******Create All Processors got internal error!\r\n" + exx.ToString()); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + "******Create All Processors got internal error: " + exx.ToString().Substring(0, 70) + "..."); Console.ResetColor(); throw; } try { mainLogger.LogInformation($"Start All Processors is on requesting by reason: {reason}"); Console.BackgroundColor = ConsoleColor.Yellow; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + $"=====>Start All Processors is on requesting by reason: {reason}"); Console.ResetColor(); var startResults = await this.StartProcessorsAsync( this.currentProcessorInstantiatedOperatingResults.Where(r => r.Succeed).Select(r => r.ProcessorInstance), reason); mainLogger.LogInformation(" Start All Processors done successfully"); Console.BackgroundColor = ConsoleColor.Green; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + "<=====Start All Processors done successfully"); Console.ResetColor(); return startResults.Select(r => ProcessorLifeCycleOperationResult.From(r)); } catch (Exception exx) { mainLogger.LogError("******Start All Processors got internal error!\r\n" + exx.ToString()); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff") + "******Start All Processors got internal error: " + exx.ToString().Substring(0, 70) + "..."); Console.ResetColor(); throw; } } /// /// /// /// localmqtt, remotemqtt, webapi /// /// [UniversalApi(InputParametersExampleJson = "[\"localMqtt\",[\"Pump\",\"ATG\"]]")] public async Task> ShowMeApi(string apiType, string[] tags) { var communicationProvider = this.services.GetService()?.CommunicationProviders?.Where(p => p.GetType().Name.Contains(apiType, StringComparison.OrdinalIgnoreCase))?.FirstOrDefault(); if (communicationProvider == null) return Enumerable.Empty(); return communicationProvider.GetApiDocuments().FilterByTags(tags); } public void Init(IEnumerable processors) { } public async Task Start() { try { Console.WriteLine(" Setup Universal Api communicators..."); await services.GetRequiredService() .InitAsync(this.currentProcessorInstantiatedOperatingResults.Where(r => r.Succeed).Select(r => r.ProcessorInstance)); Console.WriteLine(" Setup finished."); } catch (Exception exxx) { mainLogger.LogInformation(" Setup UniversalApiHub Failed: " + exxx); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(" Setup UniversalApiHub Failed: " + exxx); Console.ResetColor(); throw; } return true; } public Task Stop() { mainLogger.LogError("Stopping DefaultDispatcher processor..."); Console.BackgroundColor = ConsoleColor.Red; Console.ForegroundColor = ConsoleColor.Black; Console.WriteLine(" FATAL, Stopping DefaultDispatcher will lose major functions..."); Console.ResetColor(); this.services.GetRequiredService().CommunicationProviders.ToList().ForEach(c => c.Dispose()); return Task.FromResult(true); } } }