using Microsoft.AspNetCore.Mvc.Diagnostics; using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Driver; using Salmon.Core; using System; using System.Collections; using System.Security.Cryptography; namespace Salmon.Web.Data; public class MongoDbInterface : Salmon.Core.Depth.Persistence { private enum Constraint { Equal, NotEqual, Greater, GreaterOrEqual, Less, LessOrEqual } public MongoClient Client { get; private set; } public IMongoDatabase Database { get; private set; } public const string TripletsCollectionName = "Triplets"; public const string EventsCollectionName = "Events"; public IMongoCollection TripletsCollection { get; private set; } public IMongoCollection EventsCollection { get; private set; } public MongoDbInterface(IConfiguration conf) { var connectionStr = conf["Persistence:ConnectionString"]; if (connectionStr is null) throw new Exception("Cannot create persistence module without connection string."); Client = new (connectionStr); Database = Client.GetDatabase("Salmon"); TripletsCollection = Database.GetCollection(TripletsCollectionName); EventsCollection = Database.GetCollection(EventsCollectionName); } public async Task Test() { var databases = Client.ListDatabasesAsync().Result; await databases.MoveNextAsync(); try { bool b = Client.Cluster.Description.Servers.Single().State == MongoDB.Driver.Core.Servers.ServerState.Connected; return true; } catch (Exception ex) { Console.WriteLine($"Mongodb connection test failed: {ex}"); return false; } } public override async Task GetLastValue(string subject, string predicate) { var filter = Builders.Filter.And(new[] { Builders.Filter.Eq("Subject", subject), Builders.Filter.Eq("Predicate", predicate) }); var sort = Builders.Sort.Ascending("When"); using (var cursor = await TripletsCollection.FindAsync(filter, new() { Sort = sort, Limit = 1 })) { if (!await cursor.MoveNextAsync()) return null; var batch = cursor.Current; foreach(var i in batch) { if (!i.Contains("Object")) throw new Exception("Response value doesn't have any Object field"); return DeserializeRawValue(i["Object"]); } return null; } } protected override async Task RegisterStateChange(string subject, string predicate, DateTime when, object? value) { await TripletsCollection.InsertOneAsync( new BsonDocument(new[] { new KeyValuePair("Subject", subject), new KeyValuePair("Predicate", predicate), new KeyValuePair("When", when), new KeyValuePair("Object", value) } )); } public override async IAsyncEnumerable GetState() { var src = TripletsCollection .Aggregate() .Group(new BsonDocument { { "_id", new BsonDocument { { "Subject", "$Subject" }, { "Predicate", "$Predicate" } } }, { "Subject", new BsonDocument("$first", "$Subject") }, { "Predicate", new BsonDocument("$first", "$Predicate") }, { "When", new BsonDocument("$first", "$When") }, { "Object", new BsonDocument("$first", "$Object") } }); using (var cursor = await src.ToCursorAsync()) { if (!await cursor.MoveNextAsync()) yield break; var batch = cursor.Current; foreach (var i in batch) { if (!i.Contains("Subject")) throw new Exception("Response value doesn't have any Subject field"); if (!i.Contains("Predicate")) throw new Exception("Response value doesn't have any Predicate field"); if (!i.Contains("Object")) throw new Exception("Response value doesn't have any Object field"); if (!i.Contains("When")) throw new Exception("Response value doesn't have any When field"); yield return new Triplet { key = i[1].AsString, predicate = i[2].AsString, LastFlush = i[3].ToUniversalTime(), value = DeserializeRawValue(i[4]), }; } } } static private object? DeserializeRawValue(BsonValue v) { if (v.IsBsonNull) return null; if (v.IsBoolean) return v.AsBoolean; if (v.IsString) return v.AsString; if (v.IsValidDateTime) return v.ToUniversalTime(); if (v.IsInt32) return v.AsInt32; if (v.IsInt64) return v.AsInt64; if (v.IsDouble) return v.AsDouble; if (v.IsDecimal128) return v.AsDecimal128; throw new NotImplementedException(); } static private BsonValue SerializeRawValue(object? value) { if (value == null) return BsonNull.Value; if (value is bool) return (bool)value; if (value is string) return (string)value; if (value is DateTime) return (DateTime)value; if (value is byte) return (byte)value; if (value is short) return (short)value; if(value is ushort) return (ushort)value; if (value is int) return (int)value; if(value is uint) return (uint)value; if (value is long) return (long)value; if(value is float) return (float)value; if(value is double) return (double)value; if(value is decimal) return (decimal)value; throw new NotImplementedException(); } public override async Task RegisterEvent(Event e) { var properties = new BsonDocument(); foreach (var kv in e.Properties) properties.Add(kv.Key, SerializeRawValue(kv.Value)); var bson = new BsonDocument(new[] { new KeyValuePair("_id", e.UniqueId), new KeyValuePair("Thrower", e.ThrowerId), new KeyValuePair("Type", e.Type), new KeyValuePair("When", e.When), new KeyValuePair("Properties", properties), }); await EventsCollection.InsertOneAsync(bson); } public override Task GetEvent(string id) { throw new NotImplementedException(); } private Event EventFromBson(BsonDocument bson) { Event ret = new(bson["_id"].AsString, bson["Thrower"].AsString, bson["Type"].AsString, bson["When"].ToUniversalTime()); foreach (var kv in bson["Properties"].AsBsonDocument) ret.Properties.Add(kv.Name, kv.Value); return ret; } private bool isMainProperty(string name) { if (name == "Type") return true; return false; } public override async IAsyncEnumerable GetEvents(string? subject = null, DateTime? from = null, DateTime? to = null, int? limit = null, int? offset = null, Dictionary? filters = null) { var conditions = new List>(); if (subject is not null) conditions.Add(Builders.Filter.Eq("Thrower", subject)); if(filters is not null) { Func> createFilter = (Constraint c, string property, object value) => { if (c == Constraint.Equal) return Builders.Filter.Eq(property, value); else if (c == Constraint.NotEqual) return Builders.Filter.Not(Builders.Filter.Eq(property, value)); else if (c == Constraint.Less) return Builders.Filter.Lt(property, value); else if (c == Constraint.LessOrEqual) return Builders.Filter.Lte(property, value); else if (c == Constraint.Greater) return Builders.Filter.Gt(property, value); else if (c == Constraint.GreaterOrEqual) return Builders.Filter.Gte(property, value); throw new NotImplementedException(); }; foreach (var f in filters) { string property; Constraint constraint; if (f.Key.StartsWith("!")) { property = f.Key.Substring(1); constraint = Constraint.NotEqual; } else if (f.Key.StartsWith("<")) { property = f.Key.Substring(1); constraint = Constraint.Less; } else if (f.Key.StartsWith("<=")) { property = f.Key.Substring(2); constraint = Constraint.LessOrEqual; } else if (f.Key.StartsWith(">")) { property = f.Key.Substring(1); constraint = Constraint.Greater; } else if (f.Key.StartsWith(">=")) { property = f.Key.Substring(2); constraint = Constraint.GreaterOrEqual; } else { property = f.Key; constraint = Constraint.Equal; } bool isMain = isMainProperty(property); if (isMain) conditions.Add(createFilter(constraint, property, f.Value)); else conditions.Add(Builders.Filter.Eq("Properties", createFilter(constraint, property, f.Value))); } } FilterDefinition filter = conditions.Count > 0 ? Builders.Filter.And(conditions) : FilterDefinition.Empty; var sort = Builders.Sort.Descending("When"); using (var cursor = await EventsCollection.FindAsync(filter, new() { Sort = sort, Limit = limit, })) { if (!await cursor.MoveNextAsync()) yield break; var batch = cursor.Current; foreach (var i in batch) { yield return EventFromBson(i); } } } }