worked on persistence

This commit is contained in:
taywon18 2024-05-17 18:11:36 +02:00
parent f903be2342
commit d4acba067c
11 changed files with 525 additions and 18 deletions

View File

@ -7,6 +7,7 @@ public class EventManager
public long MaxBufferedEvents { get; set; } = 2000;
Queue<string> _EventIds = new ();
Dictionary<string, Event> _LastEvents = new ();
public Persistence? History { get; set; }
object Mutex = new();
public EventManager()
@ -23,6 +24,7 @@ public class EventManager
_LastEvents.Remove(_EventIds.Dequeue());
}
[Obsolete]
void Set(Event e)
{
ArgumentNullException.ThrowIfNull(e, "e");
@ -46,6 +48,8 @@ public class EventManager
_EventIds.Enqueue(e.UniqueId);
_LastEvents.Add(e.UniqueId, e);
}
if(History is not null)
History.RegisterEvent(e);
Trim();
}
@ -61,4 +65,48 @@ public class EventManager
{
return _LastEvents.Values.OrderByDescending(x => x.When).Take(maxCount).ToList();
}
public async IAsyncEnumerable<Event> GetEvents(string? subject = null,
DateTime? from = null,
DateTime? to = null,
int? limit = null,
int? offset = null,
Dictionary<string, object>? filters = null)
{
if(History is null)
{
IEnumerable<Event> ret;
lock (Mutex)
{
ret = _LastEvents
.Values
.OrderByDescending(x => x.When);
}
if(subject is not null)
ret = ret.Where(x => x.ThrowerId == subject);
if (from is not null)
ret = ret.Where(x => x.When >= from);
if (to is not null)
ret = ret.Where(x => x.When <= to);
if (offset is not null)
ret = ret.Skip(offset.Value);
if (limit is not null)
ret = ret.Take(limit.Value);
foreach (var i in ret)
yield return i;
//TODO: Implement filters
yield break;
}
await foreach (var i in History.GetEvents(subject, from, to, limit, offset, filters))
yield return i;
}
}

View File

@ -25,9 +25,14 @@ public abstract class Persistence
}).ConfigureAwait(false);
}
public abstract Task<object> GetLastValue(string subject, string predicate);
public abstract Task<object?> GetLastValue(string subject, string predicate);
public abstract IAsyncEnumerable<Triplet> GetState();
protected abstract Task RegisterStateChange(string subject, string predicate, DateTime when, object? value);
public abstract Task RegisterEvent(Event e);
public abstract Task<Event?> GetEvent(string id);
public abstract IAsyncEnumerable<Event> GetEvents(string? subject = null, DateTime? from = null, DateTime? to = null, int? limit = null, int? offset = null, Dictionary<string, object>? filters = null);
protected virtual void ExceptionCatched(Exception e)
{
Console.WriteLine(e.ToString());

View File

@ -25,10 +25,15 @@ public class StateMachine
public async Task PopulateWithHistory()
{
if (History is not null)
throw new NotImplementedException();
Populated = true;
if (History is not null)
await foreach (var i in History.GetState())
Push(i);
}
public void Push(Triplet triplet)
{
Push(triplet.key, triplet.predicate, triplet.value, triplet.LastFlush);
}
/// <summary>

View File

@ -15,6 +15,7 @@ public class Instance
Persistence = persistence;
State.Events = Event;
State.History = persistence;
Event.History = persistence;
}
public async Task Init()

View File

@ -1,4 +1,6 @@
using System.Net.Http.Json;
using System.Security.Cryptography;
using System.Xml.Linq;
namespace Salmon.Core;
@ -6,6 +8,7 @@ public class Transmitter
{
Cliff.Translator Translator { get; } = new();
HttpClient Client { get; } = new();
public Uri? BaseUrl { get; set; } = null;
public async Task SendAsync(Uri uri, IEnumerable<Element> elements, CancellationToken tk = default)
{
@ -15,4 +18,39 @@ public class Transmitter
await Client.PostAsJsonAsync(uri, triplets, cancellationToken: tk);
}
public async Task SendAsync(Uri uri, IEnumerable<Event> events, CancellationToken tk = default)
{
await Client.PostAsJsonAsync(uri, events, cancellationToken: tk);
}
public async Task SendAsync(IEnumerable<Element> elements, CancellationToken tk = default)
{
if (BaseUrl is null)
throw new Exception("SendAsync call without defining BaseUrl.");
Uri uri = new Uri(BaseUrl, "Push/Elements");
await Client.PostAsJsonAsync(uri, elements, tk);
}
public async Task SendAsync(Element element, CancellationToken tk = default)
{
await SendAsync(new[] { element });
}
public async Task SendAsync(IEnumerable<Event> events, CancellationToken tk = default)
{
if (BaseUrl is null)
throw new Exception("SendAsync call without defining BaseUrl.");
Uri uri = new Uri(BaseUrl, "Push/Events");
await Client.PostAsJsonAsync(uri, events, tk);
}
public async Task SendAsync(Event ev, CancellationToken tk = default)
{
await SendAsync(new[] { ev });
}
}

View File

@ -0,0 +1,12 @@
{
"version": 1,
"isRoot": true,
"tools": {
"dotnet-ef": {
"version": "8.0.4",
"commands": [
"dotnet-ef"
]
}
}
}

View File

@ -0,0 +1,335 @@
using Microsoft.AspNetCore.Mvc.Diagnostics;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using Salmon.Core;
using System;
using System.Collections;
using System.Security.Cryptography;
namespace Salmon.Web.Data;
public class MongoDbInterface
: Salmon.Core.Depth.Persistence
{
private enum Constraint
{
Equal,
NotEqual,
Greater,
GreaterOrEqual,
Less,
LessOrEqual
}
string ConnectionString = "mongodb://Salmon.Web:28z6gRwbAB22rL@voie93quarts.fr:27017/Salmon";
public MongoClient Client { get; private set; }
public IMongoDatabase Database { get; private set; }
public const string TripletsCollectionName = "Triplets";
public const string EventsCollectionName = "Events";
public IMongoCollection<BsonDocument> TripletsCollection { get; private set; }
public IMongoCollection<BsonDocument> EventsCollection { get; private set; }
public MongoDbInterface()
{
Client = new (ConnectionString);
Database = Client.GetDatabase("Salmon");
TripletsCollection = Database.GetCollection<BsonDocument>(TripletsCollectionName);
EventsCollection = Database.GetCollection<BsonDocument>(EventsCollectionName);
}
public async Task<bool> Test()
{
try
{
await Database.RunCommandAsync((Command<BsonDocument>)"{ping:1}");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"Mongodb connection test failed: {ex}");
return false;
}
}
public override async Task<object?> GetLastValue(string subject, string predicate)
{
var filter = Builders<BsonDocument>.Filter.And(new[]
{
Builders<BsonDocument>.Filter.Eq("Subject", subject),
Builders<BsonDocument>.Filter.Eq("Predicate", predicate)
});
var sort = Builders<BsonDocument>.Sort.Ascending("When");
using (var cursor = await TripletsCollection.FindAsync(filter, new()
{
Sort = sort,
Limit = 1
}))
{
if (!await cursor.MoveNextAsync())
return null;
var batch = cursor.Current;
foreach(var i in batch)
{
if (!i.Contains("Object"))
throw new Exception("Response value doesn't have any Object field");
return DeserializeRawValue(i["Object"]);
}
return null;
}
}
protected override async Task RegisterStateChange(string subject, string predicate, DateTime when, object? value)
{
await TripletsCollection.InsertOneAsync(
new BsonDocument(new[]
{
new KeyValuePair<string, object>("Subject", subject),
new KeyValuePair<string, object>("Predicate", predicate),
new KeyValuePair<string, object>("When", when),
new KeyValuePair<string, object>("Object", value)
}
));
}
public override async IAsyncEnumerable<Triplet> GetState()
{
var src = TripletsCollection
.Aggregate()
.Group(new BsonDocument
{
{ "_id", new BsonDocument
{
{ "Subject", "$Subject" },
{ "Predicate", "$Predicate" }
} },
{ "Subject", new BsonDocument("$first", "$Subject") },
{ "Predicate", new BsonDocument("$first", "$Predicate") },
{ "When", new BsonDocument("$first", "$When") },
{ "Object", new BsonDocument("$first", "$Object") }
});
using (var cursor = await src.ToCursorAsync())
{
if (!await cursor.MoveNextAsync())
yield break;
var batch = cursor.Current;
foreach (var i in batch)
{
if (!i.Contains("Subject"))
throw new Exception("Response value doesn't have any Subject field");
if (!i.Contains("Predicate"))
throw new Exception("Response value doesn't have any Predicate field");
if (!i.Contains("Object"))
throw new Exception("Response value doesn't have any Object field");
if (!i.Contains("When"))
throw new Exception("Response value doesn't have any When field");
yield return new Triplet
{
key = i[1].AsString,
predicate = i[2].AsString,
LastFlush = i[3].ToUniversalTime(),
value = DeserializeRawValue(i[4]),
};
}
}
}
static private object? DeserializeRawValue(BsonValue v)
{
if (v.IsBsonNull)
return null;
if (v.IsBoolean)
return v.AsBoolean;
if (v.IsString)
return v.AsString;
if (v.IsValidDateTime)
return v.ToUniversalTime();
if (v.IsInt32)
return v.AsInt32;
if (v.IsInt64)
return v.AsInt64;
if (v.IsDouble)
return v.AsDouble;
if (v.IsDecimal128)
return v.AsDecimal128;
throw new NotImplementedException();
}
static private BsonValue SerializeRawValue(object? value)
{
if (value == null)
return BsonNull.Value;
if (value is bool) return (bool)value;
if (value is string) return (string)value;
if (value is DateTime) return (DateTime)value;
if (value is byte) return (byte)value;
if (value is short) return (short)value;
if(value is ushort) return (ushort)value;
if (value is int) return (int)value;
if(value is uint) return (uint)value;
if (value is long) return (long)value;
if(value is float) return (float)value;
if(value is double) return (double)value;
if(value is decimal) return (decimal)value;
throw new NotImplementedException();
}
public override async Task RegisterEvent(Event e)
{
var properties = new BsonDocument();
foreach (var kv in e.Properties)
properties.Add(kv.Key, SerializeRawValue(kv.Value));
var bson = new BsonDocument(new[]
{
new KeyValuePair<string, object>("_id", e.UniqueId),
new KeyValuePair<string, object>("Thrower", e.ThrowerId),
new KeyValuePair<string, object>("Type", e.Type),
new KeyValuePair<string, object>("When", e.When),
new KeyValuePair<string, object>("Properties", properties),
});
await EventsCollection.InsertOneAsync(bson);
}
public override Task<Event?> GetEvent(string id)
{
throw new NotImplementedException();
}
private Event EventFromBson(BsonDocument bson)
{
Event ret = new(bson["_id"].AsString, bson["Thrower"].AsString, bson["Type"].AsString, bson["When"].ToUniversalTime());
foreach (var kv in bson["Properties"].AsBsonDocument)
ret.Properties.Add(kv.Name, kv.Value);
return ret;
}
private bool isMainProperty(string name)
{
if (name == "Type")
return true;
return false;
}
public override async IAsyncEnumerable<Event> GetEvents(string? subject = null, DateTime? from = null, DateTime? to = null, int? limit = null, int? offset = null, Dictionary<string, object>? filters = null)
{
var conditions = new List<FilterDefinition<BsonDocument>>();
if (subject is not null)
conditions.Add(Builders<BsonDocument>.Filter.Eq("Thrower", subject));
if(filters is not null)
{
Func<Constraint, string, object, FilterDefinition<BsonDocument>> createFilter = (Constraint c, string property, object value) =>
{
if (c == Constraint.Equal)
return Builders<BsonDocument>.Filter.Eq(property, value);
else if (c == Constraint.NotEqual)
return Builders<BsonDocument>.Filter.Not(Builders<BsonDocument>.Filter.Eq(property, value));
else if (c == Constraint.Less)
return Builders<BsonDocument>.Filter.Lt(property, value);
else if (c == Constraint.LessOrEqual)
return Builders<BsonDocument>.Filter.Lte(property, value);
else if (c == Constraint.Greater)
return Builders<BsonDocument>.Filter.Gt(property, value);
else if (c == Constraint.GreaterOrEqual)
return Builders<BsonDocument>.Filter.Gte(property, value);
throw new NotImplementedException();
};
foreach (var f in filters)
{
string property;
Constraint constraint;
if (f.Key.StartsWith("!"))
{
property = f.Key.Substring(1);
constraint = Constraint.NotEqual;
}
else if (f.Key.StartsWith("<"))
{
property = f.Key.Substring(1);
constraint = Constraint.Less;
}
else if (f.Key.StartsWith("<="))
{
property = f.Key.Substring(2);
constraint = Constraint.LessOrEqual;
}
else if (f.Key.StartsWith(">"))
{
property = f.Key.Substring(1);
constraint = Constraint.Greater;
}
else if (f.Key.StartsWith(">="))
{
property = f.Key.Substring(2);
constraint = Constraint.GreaterOrEqual;
}
else
{
property = f.Key;
constraint = Constraint.Equal;
}
bool isMain = isMainProperty(property);
if (isMain)
conditions.Add(createFilter(constraint, property, f.Value));
else
conditions.Add(Builders<BsonDocument>.Filter.Eq("Properties", createFilter(constraint, property, f.Value)));
}
}
var filter = Builders<BsonDocument>.Filter.And(conditions);
var sort = Builders<BsonDocument>.Sort.Descending("When");
using (var cursor = await EventsCollection.FindAsync(filter, new()
{
Sort = sort,
Limit = limit,
}))
{
if (!await cursor.MoveNextAsync())
yield break;
var batch = cursor.Current;
foreach (var i in batch)
{
yield return EventFromBson(i);
}
}
}
}

View File

@ -31,6 +31,7 @@ else
</tr>
</thead>
<tbody>
@if (Triplets is not null)
@foreach (var t in Triplets)
{
<tr>
@ -41,8 +42,26 @@ else
}
</tbody>
</table>
<h4>Évènements</h4>
<h4>Évènements</h4>
<table class="table">
<thead>
<tr>
<th>Date</th>
<th>Type</th>
</tr>
</thead>
<tbody>
@if(Events is not null)
@foreach (var e in Events)
{
<tr>
<td>@e.When</td>
<td>@e.Type</td>
</tr>
}
</tbody>
</table>
}
@code {
@ -60,6 +79,11 @@ else
{
await base.OnInitializedAsync();
await Refresh();
}
private async Task Refresh()
{
Triplets = Salmon.State.Get(Id).ToList();
if (Triplets.Count == 0)
{
@ -77,6 +101,12 @@ else
Error = $"Erreur: {e}";
return;
}
List<Event> events = new();
await foreach (var e in Salmon.Event.GetEvents(subject: Id, limit: 25))
events.Add(e);
Events = events;
}

View File

@ -12,6 +12,9 @@
}
else
{
<input type="checkbox" @bind="ExcludeValueChange" id="exclude-value-change">
<label for="exclude-value-change">Exclude value change</label>
<table class="table">
<thead>
<tr>
@ -36,22 +39,34 @@ else
@code {
private static System.Timers.Timer Time = new System.Timers.Timer(5000);
private Event[]? SelectedEvents;
private bool ExcludeValueChange = true;
protected override async Task OnInitializedAsync()
{
await base.OnInitializedAsync();
await Refresh();
Time.Elapsed += async (Object? source, System.Timers.ElapsedEventArgs e) =>
{
await InvokeAsync(() => Refresh());
await InvokeAsync(async () => await Refresh());
};
Time.AutoReset = true;
Time.Enabled = true;
}
public void Refresh()
public async Task Refresh()
{
SelectedEvents = Salmon.Event.GetLastEvents().ToArray();
var ev = new List<Event>();
var filters = new Dictionary<string, object>();
if(ExcludeValueChange)
filters.Add("!Type", "state_changed");
await foreach (var i in Salmon.Event.GetEvents(filters: filters))
ev.Add(i);
SelectedEvents = ev.ToArray();
StateHasChanged();
}

View File

@ -4,8 +4,7 @@ using Salmon.Web;
using Salmon.Web.Data;
using System.Text.Json.Serialization;
Salmon.Core.Instance CoreInstance = new();
Salmon.Core.Instance CoreInstance;
var builder = WebApplication.CreateBuilder(args);
builder.Services
@ -15,6 +14,7 @@ builder.Services
options.JsonSerializerOptions.Converters.Add(new TripletJsonConverter());
});
builder.Services.AddSwaggerGen();
// Add services to the container.
builder.Services.AddRazorPages();
@ -22,12 +22,21 @@ builder.Services.AddServerSideBlazor();
builder.Services.AddBlazorBootstrap();
builder.Services.AddSingleton(CoreInstance);
MongoDbInterface dbInterface = new();
while (!await dbInterface.Test())
{
Console.WriteLine("Connection to mongodb failed, retrying...");
await Task.Delay(500);
}
builder.Services.AddSingleton(CoreInstance = new Salmon.Core.Instance(dbInterface));
builder.Services.AddSingleton<WeatherForecastService>();
Console.WriteLine("Connected to Mongo database.");
CoreInstance.Persistence = dbInterface;
CoreInstance.State.History = dbInterface;
await CoreInstance.State.PopulateWithHistory();
builder.Services.AddSingleton(dbInterface);
builder.Services.AddSingleton(new CurrentSoftwareRefresher(CoreInstance));
var app = builder.Build();
// Configure the HTTP request pipeline.
@ -38,6 +47,12 @@ if (!app.Environment.IsDevelopment())
app.UseHsts();
}
app.UseSwagger();
app.UseSwaggerUI(c =>
{
c.SwaggerEndpoint("/swagger/v1/swagger.json", "Blazor API V1");
});
app.UseHttpsRedirection();
app.UseStaticFiles();

View File

@ -11,6 +11,9 @@
<ItemGroup>
<PackageReference Include="Blazor.Bootstrap" Version="2.1.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
<PackageReference Include="MongoDB.Driver" Version="2.25.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageReference Include="Swashbuckle.AspNetCore.Swagger" Version="6.5.0" />
</ItemGroup>
<ItemGroup>