From d4acba067c09c530d8295790c677e7fccbd31206 Mon Sep 17 00:00:00 2001 From: taywon18 Date: Fri, 17 May 2024 18:11:36 +0200 Subject: [PATCH] worked on persistence --- Salmon.Core/Depth/EventManager.cs | 48 ++++ Salmon.Core/Depth/Persistence.cs | 9 +- Salmon.Core/Depth/StateMachine.cs | 11 +- Salmon.Core/Instance.cs | 1 + Salmon.Core/Transmitter.cs | 40 +++- Salmon.Web/.config/dotnet-tools.json | 12 + Salmon.Web/Data/MongoDbInterface.cs | 335 +++++++++++++++++++++++++++ Salmon.Web/Pages/ElementPage.razor | 36 ++- Salmon.Web/Pages/EventList.razor | 21 +- Salmon.Web/Program.cs | 27 ++- Salmon.Web/Salmon.Web.csproj | 3 + 11 files changed, 525 insertions(+), 18 deletions(-) create mode 100644 Salmon.Web/.config/dotnet-tools.json create mode 100644 Salmon.Web/Data/MongoDbInterface.cs diff --git a/Salmon.Core/Depth/EventManager.cs b/Salmon.Core/Depth/EventManager.cs index 3c25167..0b83ed4 100644 --- a/Salmon.Core/Depth/EventManager.cs +++ b/Salmon.Core/Depth/EventManager.cs @@ -7,6 +7,7 @@ public class EventManager public long MaxBufferedEvents { get; set; } = 2000; Queue _EventIds = new (); Dictionary _LastEvents = new (); + public Persistence? History { get; set; } object Mutex = new(); public EventManager() @@ -23,6 +24,7 @@ public class EventManager _LastEvents.Remove(_EventIds.Dequeue()); } + [Obsolete] void Set(Event e) { ArgumentNullException.ThrowIfNull(e, "e"); @@ -46,6 +48,8 @@ public class EventManager _EventIds.Enqueue(e.UniqueId); _LastEvents.Add(e.UniqueId, e); } + if(History is not null) + History.RegisterEvent(e); Trim(); } @@ -61,4 +65,48 @@ public class EventManager { return _LastEvents.Values.OrderByDescending(x => x.When).Take(maxCount).ToList(); } + + public async IAsyncEnumerable GetEvents(string? subject = null, + DateTime? from = null, + DateTime? to = null, + int? limit = null, + int? offset = null, + Dictionary? filters = null) + { + if(History is null) + { + IEnumerable ret; + lock (Mutex) + { + ret = _LastEvents + .Values + .OrderByDescending(x => x.When); + } + + if(subject is not null) + ret = ret.Where(x => x.ThrowerId == subject); + + if (from is not null) + ret = ret.Where(x => x.When >= from); + + if (to is not null) + ret = ret.Where(x => x.When <= to); + + if (offset is not null) + ret = ret.Skip(offset.Value); + + if (limit is not null) + ret = ret.Take(limit.Value); + + foreach (var i in ret) + yield return i; + + //TODO: Implement filters + + yield break; + } + + await foreach (var i in History.GetEvents(subject, from, to, limit, offset, filters)) + yield return i; + } } \ No newline at end of file diff --git a/Salmon.Core/Depth/Persistence.cs b/Salmon.Core/Depth/Persistence.cs index 41b47e6..c384f45 100644 --- a/Salmon.Core/Depth/Persistence.cs +++ b/Salmon.Core/Depth/Persistence.cs @@ -25,9 +25,14 @@ public abstract class Persistence }).ConfigureAwait(false); } - public abstract Task GetLastValue(string subject, string predicate); - + public abstract Task GetLastValue(string subject, string predicate); + public abstract IAsyncEnumerable GetState(); protected abstract Task RegisterStateChange(string subject, string predicate, DateTime when, object? value); + + public abstract Task RegisterEvent(Event e); + public abstract Task GetEvent(string id); + public abstract IAsyncEnumerable GetEvents(string? subject = null, DateTime? from = null, DateTime? to = null, int? limit = null, int? offset = null, Dictionary? filters = null); + protected virtual void ExceptionCatched(Exception e) { Console.WriteLine(e.ToString()); diff --git a/Salmon.Core/Depth/StateMachine.cs b/Salmon.Core/Depth/StateMachine.cs index c06415b..1a80207 100644 --- a/Salmon.Core/Depth/StateMachine.cs +++ b/Salmon.Core/Depth/StateMachine.cs @@ -25,10 +25,15 @@ public class StateMachine public async Task PopulateWithHistory() { - if (History is not null) - throw new NotImplementedException(); - Populated = true; + if (History is not null) + await foreach (var i in History.GetState()) + Push(i); + } + + public void Push(Triplet triplet) + { + Push(triplet.key, triplet.predicate, triplet.value, triplet.LastFlush); } /// diff --git a/Salmon.Core/Instance.cs b/Salmon.Core/Instance.cs index c45974d..15031d7 100644 --- a/Salmon.Core/Instance.cs +++ b/Salmon.Core/Instance.cs @@ -15,6 +15,7 @@ public class Instance Persistence = persistence; State.Events = Event; State.History = persistence; + Event.History = persistence; } public async Task Init() diff --git a/Salmon.Core/Transmitter.cs b/Salmon.Core/Transmitter.cs index fe3187f..2b0900b 100644 --- a/Salmon.Core/Transmitter.cs +++ b/Salmon.Core/Transmitter.cs @@ -1,4 +1,6 @@ using System.Net.Http.Json; +using System.Security.Cryptography; +using System.Xml.Linq; namespace Salmon.Core; @@ -6,13 +8,49 @@ public class Transmitter { Cliff.Translator Translator { get; } = new(); HttpClient Client { get; } = new(); + public Uri? BaseUrl { get; set; } = null; public async Task SendAsync(Uri uri, IEnumerable elements, CancellationToken tk = default) { - List triplets = new (); + List triplets = new(); foreach (Element element in elements) triplets.AddRange(Translator.Encode(element)); await Client.PostAsJsonAsync(uri, triplets, cancellationToken: tk); } + + public async Task SendAsync(Uri uri, IEnumerable events, CancellationToken tk = default) + { + await Client.PostAsJsonAsync(uri, events, cancellationToken: tk); + } + + public async Task SendAsync(IEnumerable elements, CancellationToken tk = default) + { + if (BaseUrl is null) + throw new Exception("SendAsync call without defining BaseUrl."); + + Uri uri = new Uri(BaseUrl, "Push/Elements"); + + await Client.PostAsJsonAsync(uri, elements, tk); + } + + public async Task SendAsync(Element element, CancellationToken tk = default) + { + await SendAsync(new[] { element }); + } + + public async Task SendAsync(IEnumerable events, CancellationToken tk = default) + { + if (BaseUrl is null) + throw new Exception("SendAsync call without defining BaseUrl."); + + Uri uri = new Uri(BaseUrl, "Push/Events"); + + await Client.PostAsJsonAsync(uri, events, tk); + } + + public async Task SendAsync(Event ev, CancellationToken tk = default) + { + await SendAsync(new[] { ev }); + } } diff --git a/Salmon.Web/.config/dotnet-tools.json b/Salmon.Web/.config/dotnet-tools.json new file mode 100644 index 0000000..677ed3f --- /dev/null +++ b/Salmon.Web/.config/dotnet-tools.json @@ -0,0 +1,12 @@ +{ + "version": 1, + "isRoot": true, + "tools": { + "dotnet-ef": { + "version": "8.0.4", + "commands": [ + "dotnet-ef" + ] + } + } +} \ No newline at end of file diff --git a/Salmon.Web/Data/MongoDbInterface.cs b/Salmon.Web/Data/MongoDbInterface.cs new file mode 100644 index 0000000..d3e39d0 --- /dev/null +++ b/Salmon.Web/Data/MongoDbInterface.cs @@ -0,0 +1,335 @@ +using Microsoft.AspNetCore.Mvc.Diagnostics; +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using MongoDB.Driver; +using Salmon.Core; +using System; +using System.Collections; +using System.Security.Cryptography; + +namespace Salmon.Web.Data; + +public class MongoDbInterface + : Salmon.Core.Depth.Persistence +{ + private enum Constraint + { + Equal, + NotEqual, + Greater, + GreaterOrEqual, + Less, + LessOrEqual + } + + string ConnectionString = "mongodb://Salmon.Web:28z6gRwbAB22rL@voie93quarts.fr:27017/Salmon"; + public MongoClient Client { get; private set; } + public IMongoDatabase Database { get; private set; } + + public const string TripletsCollectionName = "Triplets"; + public const string EventsCollectionName = "Events"; + + public IMongoCollection TripletsCollection { get; private set; } + public IMongoCollection EventsCollection { get; private set; } + + public MongoDbInterface() + { + Client = new (ConnectionString); + Database = Client.GetDatabase("Salmon"); + + TripletsCollection = Database.GetCollection(TripletsCollectionName); + EventsCollection = Database.GetCollection(EventsCollectionName); + } + + public async Task Test() + { + try + { + await Database.RunCommandAsync((Command)"{ping:1}"); + return true; + } + catch (Exception ex) + { + Console.WriteLine($"Mongodb connection test failed: {ex}"); + return false; + } + } + + public override async Task GetLastValue(string subject, string predicate) + { + var filter = Builders.Filter.And(new[] + { + Builders.Filter.Eq("Subject", subject), + Builders.Filter.Eq("Predicate", predicate) + }); + var sort = Builders.Sort.Ascending("When"); + + using (var cursor = await TripletsCollection.FindAsync(filter, new() + { + Sort = sort, + Limit = 1 + })) + { + if (!await cursor.MoveNextAsync()) + return null; + + var batch = cursor.Current; + foreach(var i in batch) + { + if (!i.Contains("Object")) + throw new Exception("Response value doesn't have any Object field"); + + return DeserializeRawValue(i["Object"]); + } + + return null; + } + + } + + protected override async Task RegisterStateChange(string subject, string predicate, DateTime when, object? value) + { + await TripletsCollection.InsertOneAsync( + new BsonDocument(new[] + { + new KeyValuePair("Subject", subject), + new KeyValuePair("Predicate", predicate), + new KeyValuePair("When", when), + new KeyValuePair("Object", value) + } + )); + } + + public override async IAsyncEnumerable GetState() + { + var src = TripletsCollection + .Aggregate() + .Group(new BsonDocument + { + { "_id", new BsonDocument + { + { "Subject", "$Subject" }, + { "Predicate", "$Predicate" } + } }, + { "Subject", new BsonDocument("$first", "$Subject") }, + { "Predicate", new BsonDocument("$first", "$Predicate") }, + { "When", new BsonDocument("$first", "$When") }, + { "Object", new BsonDocument("$first", "$Object") } + }); + + using (var cursor = await src.ToCursorAsync()) + { + if (!await cursor.MoveNextAsync()) + yield break; + + var batch = cursor.Current; + foreach (var i in batch) + { + if (!i.Contains("Subject")) + throw new Exception("Response value doesn't have any Subject field"); + + if (!i.Contains("Predicate")) + throw new Exception("Response value doesn't have any Predicate field"); + + if (!i.Contains("Object")) + throw new Exception("Response value doesn't have any Object field"); + + if (!i.Contains("When")) + throw new Exception("Response value doesn't have any When field"); + + yield return new Triplet + { + key = i[1].AsString, + predicate = i[2].AsString, + LastFlush = i[3].ToUniversalTime(), + value = DeserializeRawValue(i[4]), + + }; + } + } + } + + static private object? DeserializeRawValue(BsonValue v) + { + if (v.IsBsonNull) + return null; + + if (v.IsBoolean) + return v.AsBoolean; + + if (v.IsString) + return v.AsString; + + if (v.IsValidDateTime) + return v.ToUniversalTime(); + + if (v.IsInt32) + return v.AsInt32; + + if (v.IsInt64) + return v.AsInt64; + + if (v.IsDouble) + return v.AsDouble; + + if (v.IsDecimal128) + return v.AsDecimal128; + + throw new NotImplementedException(); + } + + static private BsonValue SerializeRawValue(object? value) + { + if (value == null) + return BsonNull.Value; + + if (value is bool) return (bool)value; + if (value is string) return (string)value; + if (value is DateTime) return (DateTime)value; + + if (value is byte) return (byte)value; + if (value is short) return (short)value; + if(value is ushort) return (ushort)value; + if (value is int) return (int)value; + if(value is uint) return (uint)value; + if (value is long) return (long)value; + + if(value is float) return (float)value; + if(value is double) return (double)value; + if(value is decimal) return (decimal)value; + + throw new NotImplementedException(); + } + + public override async Task RegisterEvent(Event e) + { + var properties = new BsonDocument(); + foreach (var kv in e.Properties) + properties.Add(kv.Key, SerializeRawValue(kv.Value)); + + var bson = new BsonDocument(new[] + { + new KeyValuePair("_id", e.UniqueId), + new KeyValuePair("Thrower", e.ThrowerId), + new KeyValuePair("Type", e.Type), + new KeyValuePair("When", e.When), + new KeyValuePair("Properties", properties), + }); + + await EventsCollection.InsertOneAsync(bson); + } + + public override Task GetEvent(string id) + { + throw new NotImplementedException(); + } + + private Event EventFromBson(BsonDocument bson) + { + Event ret = new(bson["_id"].AsString, bson["Thrower"].AsString, bson["Type"].AsString, bson["When"].ToUniversalTime()); + foreach (var kv in bson["Properties"].AsBsonDocument) + ret.Properties.Add(kv.Name, kv.Value); + + return ret; + } + + private bool isMainProperty(string name) + { + if (name == "Type") + return true; + + return false; + } + + public override async IAsyncEnumerable GetEvents(string? subject = null, DateTime? from = null, DateTime? to = null, int? limit = null, int? offset = null, Dictionary? filters = null) + { + var conditions = new List>(); + + if (subject is not null) + conditions.Add(Builders.Filter.Eq("Thrower", subject)); + + if(filters is not null) + { + Func> createFilter = (Constraint c, string property, object value) => + { + if (c == Constraint.Equal) + return Builders.Filter.Eq(property, value); + else if (c == Constraint.NotEqual) + return Builders.Filter.Not(Builders.Filter.Eq(property, value)); + else if (c == Constraint.Less) + return Builders.Filter.Lt(property, value); + else if (c == Constraint.LessOrEqual) + return Builders.Filter.Lte(property, value); + else if (c == Constraint.Greater) + return Builders.Filter.Gt(property, value); + else if (c == Constraint.GreaterOrEqual) + return Builders.Filter.Gte(property, value); + + throw new NotImplementedException(); + }; + + foreach (var f in filters) + { + string property; + Constraint constraint; + if (f.Key.StartsWith("!")) + { + property = f.Key.Substring(1); + constraint = Constraint.NotEqual; + } + else if (f.Key.StartsWith("<")) + { + property = f.Key.Substring(1); + constraint = Constraint.Less; + } + else if (f.Key.StartsWith("<=")) + { + property = f.Key.Substring(2); + constraint = Constraint.LessOrEqual; + } + else if (f.Key.StartsWith(">")) + { + property = f.Key.Substring(1); + constraint = Constraint.Greater; + } + else if (f.Key.StartsWith(">=")) + { + property = f.Key.Substring(2); + constraint = Constraint.GreaterOrEqual; + } + else + { + property = f.Key; + constraint = Constraint.Equal; + } + + bool isMain = isMainProperty(property); + if (isMain) + conditions.Add(createFilter(constraint, property, f.Value)); + else + conditions.Add(Builders.Filter.Eq("Properties", createFilter(constraint, property, f.Value))); + } + } + + + + var filter = Builders.Filter.And(conditions); + var sort = Builders.Sort.Descending("When"); + + using (var cursor = await EventsCollection.FindAsync(filter, new() + { + Sort = sort, + Limit = limit, + })) + { + if (!await cursor.MoveNextAsync()) + yield break; + + var batch = cursor.Current; + foreach (var i in batch) + { + yield return EventFromBson(i); + } + } + } +} diff --git a/Salmon.Web/Pages/ElementPage.razor b/Salmon.Web/Pages/ElementPage.razor index ea2ff30..3d4d8da 100644 --- a/Salmon.Web/Pages/ElementPage.razor +++ b/Salmon.Web/Pages/ElementPage.razor @@ -31,6 +31,7 @@ else + @if (Triplets is not null) @foreach (var t in Triplets) { @@ -41,8 +42,26 @@ else } +

Évènements

- + + + + + + + + + @if(Events is not null) + @foreach (var e in Events) + { + + + + + } + +
DateType
@e.When@e.Type
} @code { @@ -54,12 +73,17 @@ else List? Triplets = null; List? Events = null; - + protected async override Task OnInitializedAsync() { await base.OnInitializedAsync(); + await Refresh(); + } + + private async Task Refresh() + { Triplets = Salmon.State.Get(Id).ToList(); if (Triplets.Count == 0) { @@ -72,11 +96,17 @@ else ThisElement = Salmon.Translator.Decode(Triplets); Error = null; } - catch(Exception e) + catch (Exception e) { Error = $"Erreur: {e}"; return; } + + List events = new(); + await foreach (var e in Salmon.Event.GetEvents(subject: Id, limit: 25)) + events.Add(e); + + Events = events; } diff --git a/Salmon.Web/Pages/EventList.razor b/Salmon.Web/Pages/EventList.razor index 99aa5f5..945f614 100644 --- a/Salmon.Web/Pages/EventList.razor +++ b/Salmon.Web/Pages/EventList.razor @@ -12,6 +12,9 @@ } else { + + + @@ -36,22 +39,34 @@ else @code { private static System.Timers.Timer Time = new System.Timers.Timer(5000); private Event[]? SelectedEvents; + private bool ExcludeValueChange = true; protected override async Task OnInitializedAsync() { await base.OnInitializedAsync(); + await Refresh(); + Time.Elapsed += async (Object? source, System.Timers.ElapsedEventArgs e) => { - await InvokeAsync(() => Refresh()); + await InvokeAsync(async () => await Refresh()); }; Time.AutoReset = true; Time.Enabled = true; } - public void Refresh() + public async Task Refresh() { - SelectedEvents = Salmon.Event.GetLastEvents().ToArray(); + var ev = new List(); + + var filters = new Dictionary(); + if(ExcludeValueChange) + filters.Add("!Type", "state_changed"); + + await foreach (var i in Salmon.Event.GetEvents(filters: filters)) + ev.Add(i); + + SelectedEvents = ev.ToArray(); StateHasChanged(); } diff --git a/Salmon.Web/Program.cs b/Salmon.Web/Program.cs index f0abd30..f26de77 100644 --- a/Salmon.Web/Program.cs +++ b/Salmon.Web/Program.cs @@ -4,8 +4,7 @@ using Salmon.Web; using Salmon.Web.Data; using System.Text.Json.Serialization; -Salmon.Core.Instance CoreInstance = new(); - +Salmon.Core.Instance CoreInstance; var builder = WebApplication.CreateBuilder(args); builder.Services @@ -15,6 +14,7 @@ builder.Services options.JsonSerializerOptions.Converters.Add(new TripletJsonConverter()); }); +builder.Services.AddSwaggerGen(); // Add services to the container. builder.Services.AddRazorPages(); @@ -22,12 +22,21 @@ builder.Services.AddServerSideBlazor(); builder.Services.AddBlazorBootstrap(); - -builder.Services.AddSingleton(CoreInstance); +MongoDbInterface dbInterface = new(); +while (!await dbInterface.Test()) +{ + Console.WriteLine("Connection to mongodb failed, retrying..."); + await Task.Delay(500); +} +builder.Services.AddSingleton(CoreInstance = new Salmon.Core.Instance(dbInterface)); builder.Services.AddSingleton(); +Console.WriteLine("Connected to Mongo database."); +CoreInstance.Persistence = dbInterface; +CoreInstance.State.History = dbInterface; +await CoreInstance.State.PopulateWithHistory(); +builder.Services.AddSingleton(dbInterface); builder.Services.AddSingleton(new CurrentSoftwareRefresher(CoreInstance)); - var app = builder.Build(); // Configure the HTTP request pipeline. @@ -38,6 +47,12 @@ if (!app.Environment.IsDevelopment()) app.UseHsts(); } +app.UseSwagger(); +app.UseSwaggerUI(c => +{ + c.SwaggerEndpoint("/swagger/v1/swagger.json", "Blazor API V1"); +}); + app.UseHttpsRedirection(); app.UseStaticFiles(); @@ -48,4 +63,4 @@ app.MapControllers(); app.MapBlazorHub(); app.MapFallbackToPage("/_Host"); -app.Run(); +app.Run(); \ No newline at end of file diff --git a/Salmon.Web/Salmon.Web.csproj b/Salmon.Web/Salmon.Web.csproj index 5708465..49c695b 100644 --- a/Salmon.Web/Salmon.Web.csproj +++ b/Salmon.Web/Salmon.Web.csproj @@ -11,6 +11,9 @@ + + +