Salmon/Salmon.Core/Depth/EventManager.cs
2024-05-17 18:11:36 +02:00

112 lines
2.6 KiB
C#

using System.Diagnostics;
namespace Salmon.Core.Depth;
public class EventManager
{
public long MaxBufferedEvents { get; set; } = 2000;
Queue<string> _EventIds = new ();
Dictionary<string, Event> _LastEvents = new ();
public Persistence? History { get; set; }
object Mutex = new();
public EventManager()
{
}
void Trim()
{
List<string> toDelete = new();
lock(Mutex)
while(_LastEvents.Count > MaxBufferedEvents)
_LastEvents.Remove(_EventIds.Dequeue());
}
[Obsolete]
void Set(Event e)
{
ArgumentNullException.ThrowIfNull(e, "e");
lock (Mutex)
{
Debug.Assert(_LastEvents.ContainsKey(e.UniqueId));
_LastEvents[e.UniqueId] = e;
}
}
void Add(Event e)
{
ArgumentNullException.ThrowIfNull (e, "e");
lock (Mutex)
{
Debug.Assert(!_LastEvents.ContainsKey(e.UniqueId));
_EventIds.Enqueue(e.UniqueId);
_LastEvents.Add(e.UniqueId, e);
}
if(History is not null)
History.RegisterEvent(e);
Trim();
}
public void Push(Event e)
{
ArgumentNullException.ThrowIfNull(e, "e");
Add(e);
}
public IEnumerable<Event> GetLastEvents(int maxCount = 200)
{
return _LastEvents.Values.OrderByDescending(x => x.When).Take(maxCount).ToList();
}
public async IAsyncEnumerable<Event> GetEvents(string? subject = null,
DateTime? from = null,
DateTime? to = null,
int? limit = null,
int? offset = null,
Dictionary<string, object>? filters = null)
{
if(History is null)
{
IEnumerable<Event> ret;
lock (Mutex)
{
ret = _LastEvents
.Values
.OrderByDescending(x => x.When);
}
if(subject is not null)
ret = ret.Where(x => x.ThrowerId == subject);
if (from is not null)
ret = ret.Where(x => x.When >= from);
if (to is not null)
ret = ret.Where(x => x.When <= to);
if (offset is not null)
ret = ret.Skip(offset.Value);
if (limit is not null)
ret = ret.Take(limit.Value);
foreach (var i in ret)
yield return i;
//TODO: Implement filters
yield break;
}
await foreach (var i in History.GetEvents(subject, from, to, limit, offset, filters))
yield return i;
}
}