369 lines
12 KiB
C#
369 lines
12 KiB
C#
using Microsoft.AspNetCore.Mvc.Diagnostics;
|
|
using MongoDB.Bson;
|
|
using MongoDB.Bson.Serialization;
|
|
using MongoDB.Driver;
|
|
using Salmon.Core;
|
|
using System;
|
|
using System.Collections;
|
|
using System.Security.Cryptography;
|
|
|
|
namespace Salmon.Web.Data;
|
|
|
|
public class MongoDbInterface
|
|
: Salmon.Core.Depth.Persistence
|
|
{
|
|
private enum Constraint
|
|
{
|
|
Equal,
|
|
NotEqual,
|
|
Greater,
|
|
GreaterOrEqual,
|
|
Less,
|
|
LessOrEqual
|
|
}
|
|
|
|
public MongoClient Client { get; private set; }
|
|
public IMongoDatabase Database { get; private set; }
|
|
|
|
public const string TripletsCollectionName = "Triplets";
|
|
public const string EventsCollectionName = "Events";
|
|
|
|
public IMongoCollection<BsonDocument> TripletsCollection { get; private set; }
|
|
public IMongoCollection<BsonDocument> EventsCollection { get; private set; }
|
|
|
|
public MongoDbInterface(IConfiguration conf)
|
|
{
|
|
var connectionStr = conf["Persistence:ConnectionString"];
|
|
if (connectionStr is null)
|
|
throw new Exception("Cannot create persistence module without connection string.");
|
|
|
|
Client = new (connectionStr);
|
|
Database = Client.GetDatabase("Salmon");
|
|
|
|
TripletsCollection = Database.GetCollection<BsonDocument>(TripletsCollectionName);
|
|
EventsCollection = Database.GetCollection<BsonDocument>(EventsCollectionName);
|
|
}
|
|
|
|
public async Task<bool> Test()
|
|
{
|
|
var databases = Client.ListDatabasesAsync().Result;
|
|
await databases.MoveNextAsync();
|
|
|
|
try
|
|
{
|
|
bool b = Client.Cluster.Description.Servers.Single().State == MongoDB.Driver.Core.Servers.ServerState.Connected;
|
|
return true;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine($"Mongodb connection test failed: {ex}");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
public override async Task<object?> GetLastValue(string subject, string predicate)
|
|
{
|
|
var filter = Builders<BsonDocument>.Filter.And(new[]
|
|
{
|
|
Builders<BsonDocument>.Filter.Eq("Subject", subject),
|
|
Builders<BsonDocument>.Filter.Eq("Predicate", predicate)
|
|
});
|
|
var sort = Builders<BsonDocument>.Sort.Ascending("When");
|
|
|
|
using (var cursor = await TripletsCollection.FindAsync(filter, new()
|
|
{
|
|
Sort = sort,
|
|
Limit = 1
|
|
}))
|
|
{
|
|
if (!await cursor.MoveNextAsync())
|
|
return null;
|
|
|
|
var batch = cursor.Current;
|
|
foreach(var i in batch)
|
|
{
|
|
if (!i.Contains("Object"))
|
|
throw new Exception("Response value doesn't have any Object field");
|
|
|
|
return DeserializeRawValue(i["Object"]);
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
}
|
|
|
|
protected override async Task RegisterStateChange(string subject, string predicate, DateTime when, object? value)
|
|
{
|
|
await TripletsCollection.InsertOneAsync(
|
|
new BsonDocument(new[]
|
|
{
|
|
new KeyValuePair<string, object>("Subject", subject),
|
|
new KeyValuePair<string, object>("Predicate", predicate),
|
|
new KeyValuePair<string, object>("When", when),
|
|
new KeyValuePair<string, object>("Object", value)
|
|
}
|
|
));
|
|
}
|
|
|
|
public override async IAsyncEnumerable<Triplet> GetState()
|
|
{
|
|
var src = TripletsCollection
|
|
.Aggregate()
|
|
.Group(new BsonDocument
|
|
{
|
|
{ "_id", new BsonDocument
|
|
{
|
|
{ "Subject", "$Subject" },
|
|
{ "Predicate", "$Predicate" }
|
|
} },
|
|
{ "Subject", new BsonDocument("$first", "$Subject") },
|
|
{ "Predicate", new BsonDocument("$first", "$Predicate") },
|
|
{ "When", new BsonDocument("$first", "$When") },
|
|
{ "Object", new BsonDocument("$first", "$Object") }
|
|
});
|
|
|
|
using (var cursor = await src.ToCursorAsync())
|
|
{
|
|
if (!await cursor.MoveNextAsync())
|
|
yield break;
|
|
|
|
var batch = cursor.Current;
|
|
foreach (var i in batch)
|
|
{
|
|
if (!i.Contains("Subject"))
|
|
throw new Exception("Response value doesn't have any Subject field");
|
|
|
|
if (!i.Contains("Predicate"))
|
|
throw new Exception("Response value doesn't have any Predicate field");
|
|
|
|
if (!i.Contains("Object"))
|
|
throw new Exception("Response value doesn't have any Object field");
|
|
|
|
if (!i.Contains("When"))
|
|
throw new Exception("Response value doesn't have any When field");
|
|
|
|
yield return new Triplet
|
|
{
|
|
key = i[1].AsString,
|
|
predicate = i[2].AsString,
|
|
LastFlush = i[3].ToUniversalTime(),
|
|
value = DeserializeRawValue(i[4]),
|
|
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
static private object? DeserializeRawValue(BsonValue v)
|
|
{
|
|
if (v.IsBsonNull)
|
|
return null;
|
|
|
|
if (v.IsBoolean)
|
|
return v.AsBoolean;
|
|
|
|
if (v.IsString)
|
|
return v.AsString;
|
|
|
|
if (v.IsValidDateTime)
|
|
return v.ToUniversalTime();
|
|
|
|
if (v.IsInt32)
|
|
return v.AsInt32;
|
|
|
|
if (v.IsInt64)
|
|
return v.AsInt64;
|
|
|
|
if (v.IsDouble)
|
|
return v.AsDouble;
|
|
|
|
if (v.IsDecimal128)
|
|
return v.AsDecimal128;
|
|
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
static private BsonValue SerializeRawValue(object? value)
|
|
{
|
|
if (value == null)
|
|
return BsonNull.Value;
|
|
|
|
if (value is bool) return (bool)value;
|
|
if (value is string) return (string)value;
|
|
if (value is DateTime) return (DateTime)value;
|
|
|
|
if (value is byte) return (byte)value;
|
|
if (value is short) return (short)value;
|
|
if(value is ushort) return (ushort)value;
|
|
if (value is int) return (int)value;
|
|
if(value is uint) return (uint)value;
|
|
if (value is long) return (long)value;
|
|
|
|
if(value is float) return (float)value;
|
|
if(value is double) return (double)value;
|
|
if(value is decimal) return (decimal)value;
|
|
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public override async Task RegisterEvent(Event e)
|
|
{
|
|
var properties = new BsonDocument();
|
|
foreach (var kv in e.Properties)
|
|
properties.Add(kv.Key, SerializeRawValue(kv.Value));
|
|
|
|
var bson = new BsonDocument(new[]
|
|
{
|
|
new KeyValuePair<string, object>("_id", e.UniqueId),
|
|
new KeyValuePair<string, object>("Thrower", e.ThrowerId),
|
|
new KeyValuePair<string, object>("Type", e.Type),
|
|
new KeyValuePair<string, object>("Description", e.Description),
|
|
new KeyValuePair<string, object>("Valence", (int)e.Valence),
|
|
new KeyValuePair<string, object>("When", e.When),
|
|
new KeyValuePair<string, object>("Properties", properties),
|
|
});
|
|
|
|
await EventsCollection.InsertOneAsync(bson);
|
|
}
|
|
|
|
public override Task<Event?> GetEvent(string id)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
private Event EventFromBson(BsonDocument bson)
|
|
{
|
|
Event ret = new(
|
|
bson["_id"].AsString,
|
|
bson["Thrower"].AsString,
|
|
bson["Type"].AsString,
|
|
"",
|
|
bson["When"].ToUniversalTime());
|
|
|
|
if (bson.Contains("Description"))
|
|
ret.Description = bson["Description"].ToString() ?? "";
|
|
|
|
if (bson.Contains("Valence"))
|
|
ret.Valence = (Event.ValenceType)bson["Valence"].ToInt32();
|
|
|
|
foreach (var kv in bson["Properties"].AsBsonDocument)
|
|
ret.Properties.Add(kv.Name, kv.Value);
|
|
|
|
return ret;
|
|
}
|
|
|
|
private bool isMainProperty(string name)
|
|
{
|
|
if (name == "Type")
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
public override async IAsyncEnumerable<Event> GetEvents(
|
|
string? subject = null,
|
|
DateTime? from = null,
|
|
DateTime? to = null,
|
|
int? limit = null,
|
|
int? offset = null,
|
|
Event.ValenceType? valenceType = null,
|
|
Dictionary<string, object>? filters = null
|
|
)
|
|
{
|
|
var conditions = new List<FilterDefinition<BsonDocument>>();
|
|
|
|
if (subject is not null)
|
|
conditions.Add(Builders<BsonDocument>.Filter.Eq("Thrower", subject));
|
|
|
|
if (valenceType is not null)
|
|
conditions.Add(Builders<BsonDocument>.Filter.Eq("Valence", (int)valenceType.Value));
|
|
|
|
if (filters is not null)
|
|
{
|
|
Func<Constraint, string, object, FilterDefinition<BsonDocument>> createFilter = (Constraint c, string property, object value) =>
|
|
{
|
|
if (c == Constraint.Equal)
|
|
return Builders<BsonDocument>.Filter.Eq(property, value);
|
|
else if (c == Constraint.NotEqual)
|
|
return Builders<BsonDocument>.Filter.Not(Builders<BsonDocument>.Filter.Eq(property, value));
|
|
else if (c == Constraint.Less)
|
|
return Builders<BsonDocument>.Filter.Lt(property, value);
|
|
else if (c == Constraint.LessOrEqual)
|
|
return Builders<BsonDocument>.Filter.Lte(property, value);
|
|
else if (c == Constraint.Greater)
|
|
return Builders<BsonDocument>.Filter.Gt(property, value);
|
|
else if (c == Constraint.GreaterOrEqual)
|
|
return Builders<BsonDocument>.Filter.Gte(property, value);
|
|
|
|
throw new NotImplementedException();
|
|
};
|
|
|
|
foreach (var f in filters)
|
|
{
|
|
string property;
|
|
Constraint constraint;
|
|
if (f.Key.StartsWith("!"))
|
|
{
|
|
property = f.Key.Substring(1);
|
|
constraint = Constraint.NotEqual;
|
|
}
|
|
else if (f.Key.StartsWith("<"))
|
|
{
|
|
property = f.Key.Substring(1);
|
|
constraint = Constraint.Less;
|
|
}
|
|
else if (f.Key.StartsWith("<="))
|
|
{
|
|
property = f.Key.Substring(2);
|
|
constraint = Constraint.LessOrEqual;
|
|
}
|
|
else if (f.Key.StartsWith(">"))
|
|
{
|
|
property = f.Key.Substring(1);
|
|
constraint = Constraint.Greater;
|
|
}
|
|
else if (f.Key.StartsWith(">="))
|
|
{
|
|
property = f.Key.Substring(2);
|
|
constraint = Constraint.GreaterOrEqual;
|
|
}
|
|
else
|
|
{
|
|
property = f.Key;
|
|
constraint = Constraint.Equal;
|
|
}
|
|
|
|
bool isMain = isMainProperty(property);
|
|
if (isMain)
|
|
conditions.Add(createFilter(constraint, property, f.Value));
|
|
else
|
|
conditions.Add(Builders<BsonDocument>.Filter.Eq("Properties", createFilter(constraint, property, f.Value)));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
FilterDefinition<BsonDocument> filter = conditions.Count > 0
|
|
? Builders<BsonDocument>.Filter.And(conditions)
|
|
: FilterDefinition<BsonDocument>.Empty;
|
|
var sort = Builders<BsonDocument>.Sort.Descending("When");
|
|
|
|
using (var cursor = await EventsCollection.FindAsync(filter, new()
|
|
{
|
|
Sort = sort,
|
|
Limit = limit,
|
|
}))
|
|
{
|
|
if (!await cursor.MoveNextAsync())
|
|
yield break;
|
|
|
|
var batch = cursor.Current;
|
|
foreach (var i in batch)
|
|
{
|
|
yield return EventFromBson(i);
|
|
}
|
|
}
|
|
}
|
|
}
|