using System; using System.Collections.Generic; using System.Linq; using ThomsonReuters.Desktop.SDK.DataAccess; using ThomsonReuters.Desktop.SDK.DataAccess.TimeSeries; namespace Appathon { public class BulkDataRequestSettings { public DateTime? From { get; set; } public DateTime? To { get; set; } public int? NumberOfPoints { get; set; } public IEnumerable Rics { get; set; } public IEnumerable Fields { get; set; } public bool AllFields { get; set; } public string Feed { get; set; } public string View { get; set; } public Interval Interval { get; set; } public TimezoneType TimeZone { get; set; } public bool AdjustedPrice { get; set; } public bool WithContinuation { get; set; } public bool CorrectedData { get; set; } public Action Filter { get; set; } public Action>> DataReceived { get; set; } public Action> StatusUpdated { get; set; } public BulkDataRequestSettings() { } } public class BulkDataRequestSetup { private readonly BulkDataRequestSettings settings; public BulkDataRequestSetup() { settings = new BulkDataRequestSettings(); } #region ITimeSeriesDataRequestSetup public BulkDataRequestSetup From(DateTime? fromDate) { settings.From = fromDate; return this; } public BulkDataRequestSetup To(DateTime? toDate) { settings.To = toDate; return this; } public BulkDataRequestSetup WithFeed(string feed) { settings.Feed = feed; return this; } public BulkDataRequestSetup WithRics(string ric, params string[] additionalRics) { settings.Rics = additionalRics.Concat(new[] { ric }); return this; } public BulkDataRequestSetup WithInterval(Interval interval) { settings.Interval = interval; return this; } public BulkDataRequestSetup WithNumberOfPoints(int? numberOfPoints) { settings.NumberOfPoints = numberOfPoints; return this; } public BulkDataRequestSetup WithAdjustedPrice(bool isAdjustedPrice) { settings.AdjustedPrice = isAdjustedPrice; return this; } public BulkDataRequestSetup WithView(string view) { settings.View = view; return this; } public BulkDataRequestSetup WithTimeZone(TimezoneType timezone) { settings.TimeZone = timezone; return this; } public BulkDataRequestSetup WithFilter(Action filterSetup) { settings.Filter = filterSetup; return this; } public BulkDataRequestSetup WithFields(IEnumerable fields) { settings.Fields = fields; return this; } public BulkDataRequestSetup WithFields(string field, params string[] additionalFields) { settings.Fields = additionalFields.Concat(new[] { field }); return this; } public BulkDataRequestSetup WithAllFields() { settings.AllFields = true; return this; } public BulkDataRequestSetup WithCorrectedData(bool dataIncludesCorrections) { settings.CorrectedData = false; return this; } public BulkDataRequestSetup OnDataReceived(Action>> dataReceivedCallback) { settings.DataReceived = dataReceivedCallback; return this; } public BulkDataRequestSetup OnStatusUpdated(Action> statusUpdatedCallback) { settings.StatusUpdated = statusUpdatedCallback; return this; } public BulkDataRequest Create() { return new BulkDataRequest(settings); } public BulkDataRequest CreateAndSend() { return new BulkDataRequest(settings).Send(); } #endregion } public class BulkDataRequest { private readonly Dictionary requests = new Dictionary(); private readonly Dictionary> data = new Dictionary>(); private readonly Dictionary statuses = new Dictionary(); private Action>> DataReceived { get; set; } private Action> StatusUpdated { get; set; } private long counter; private DateTime min = new DateTime(2099,1,1); private DateTime max; public BulkDataRequest(BulkDataRequestSettings settings) { StatusUpdated = settings.StatusUpdated; DataReceived = settings.DataReceived; var ts = DataServices.Instance.TimeSeries; foreach (var ric in settings.Rics) { requests.Add(ric, ts.SetupDataRequest() .From(settings.From) .To(settings.To) .WithRic(ric) .WithFields(settings.Fields) .WithNumberOfPoints(settings.NumberOfPoints) .WithFeed(settings.Feed) .WithView(settings.View) .WithInterval(settings.Interval ?? new Interval(IntervalType.Daily, 1)) .WithTimeZone(settings.TimeZone) .WithAdjustedPrice(settings.AdjustedPrice) .WithCorrectedData(false) .OnDataReceived(DataReceivedCallback) .OnStatusUpdated(StatusUpdatedCallback) .Create()); } } private void StatusUpdatedCallback(IRequestStatus requestStatus) { statuses.Add(requestStatus.Ric, requestStatus); counter++; } private void DataReceivedCallback(DataChunk dataChunk) { Dictionary x; if (!data.ContainsKey(dataChunk.Ric)) { x = new Dictionary(); data.Add(dataChunk.Ric, x); } else { data.TryGetValue(dataChunk.Ric, out x); if (x == null) x = new Dictionary(); } foreach (var record in dataChunk.Records) { if (record.Timestamp.HasValue) { if (record.Timestamp.Value < min) min = record.Timestamp.Value; if (record.Timestamp.Value > max) max = record.Timestamp.Value; x.Add(record.Timestamp.Value, (double)record.AllFields["CLOSE"]); } } if (dataChunk.IsLast) { counter++; } if (counter != requests.Count) return; var output = new Dictionary>(); foreach (var entity in data) { var l = Range(min, max, data).Select(timestamp => entity.Value[timestamp]).ToList(); output.Add(entity.Key, l); } DataReceived(output); if (statuses.Any()) StatusUpdated(statuses); } private IEnumerable Range(DateTime from, DateTime to, Dictionary> entries) { var range = Enumerable.Range(0, to.Subtract(from).Days + 1).Select(d => from.AddDays(d)); var intersect = new List(); var drop = false; foreach (var d in range) { if (entries.Any(entry => !entry.Value.ContainsKey(d))) { drop = true; } if (!drop) intersect.Add(d); drop = false; } return intersect; } public BulkDataRequest Send() { counter = 0; foreach (var request in requests) request.Value.Send(); return this; } public BulkDataRequest Cancel() { foreach (var request in requests) request.Value.Cancel(); return this; } } }