最近工作中我们新增了一批人脸识别设备,该设备支持http commet轮询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class MediumMessage { public string DbId { get; set; } = default!;
public string Origin { get; set; }
public string Topic { get; set; }
public string MessageId { get; set; }
public string Message { get; set; }
public DateTime AddAt { get; set; }
public DateTime? PublishAt { get; set; }
public int Tries { get; set; }
public StatusName StatusName { get; set; } }
public enum StatusName { Failed = -1, Scheduled, Published }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public interface IDataStore {
MediumMessage StoreWhitelistMessage(string messageId, string topic, string message, WhitelistTask task, StatusName status);
MediumMessage GetMessage(string messageId);
void RemoveMessage(string messageId);
bool IsWhitelistEmpty();
bool ExistPublishedTopic(string topic);
IEnumerable<MediumMessage> GetMessagesOfNeedRetry();
void ChangePublishState(string messageId);
int DeleteExpiredMessages(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| public class FaceWhitelistWorker{ private readonly MqttClientService _mqttService; private readonly ILogger _logger; private readonly IApiClient _api; private readonly IDataStore _dataStore;
public FaceWhitelistWorker(MqttClientService mqttService, ILogger<FaceWhitelistWorker> logger, IApiClient api, IDataStore dataStore) { _mqttService = mqttService; _logger = logger; _api = api; _dataStore = dataStore; } public async Task StartAsync(CancellationToken cancellationToken) { await StartSchedule(cancellationToken); await StartPublish(cancellationToken); await StartClear(cancellationToken); }
private Task StartClear(CancellationToken ct) { const int retryInterval = 3000; return Task.Factory.StartNew(async () => { while (true) { if (ct.IsCancellationRequested) return; var count = _dataStore.DeleteExpiredMessages(); if (count > 0) _logger.LogDebug("Cleared {Count} messages", count);
await Task.Delay(retryInterval, ct); } }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default); }
private Task StartPublish(CancellationToken ct) { const int retryInterval = 500; return Task.Factory.StartNew(async () => { await Task.Delay(5000, ct); while (true) { if (ct.IsCancellationRequested) return; if (_mqttService.MqttClient is not { IsConnected: true }) { await Task.Delay(5000, ct); continue; }
var mediumMessages = _dataStore.GetMessagesOfNeedRetry(); if (mediumMessages == null || !mediumMessages.Any()) { await Task.Delay(retryInterval, ct); continue; }
foreach (var mediumMessage in mediumMessages) { if (_dataStore.ExistPublishedTopic(mediumMessage.Topic)) { _logger.LogDebug("Skip message {Id} with topic {Topic}", mediumMessage.MessageId, mediumMessage.Topic); } else { var mqttMessage = new MqttApplicationMessageBuilder() .WithTopic(mediumMessage.Topic) .WithPayload(mediumMessage.Message) .WithAtMostOnceQoS().Build(); await _mqttService.PublishAsync(mqttMessage); _dataStore.ChangePublishState(mediumMessage.MessageId); _logger.LogDebug("Publish message {Id} with topic {Topic}", mediumMessage.MessageId, mediumMessage.Topic); } }
await Task.Delay(retryInterval, ct); } }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default); }
private Task StartSchedule(CancellationToken ct) { return Task.Factory.StartNew(async () => { const int waitInterval = 5000; await Task.Delay(waitInterval, ct); while (true) { if (ct.IsCancellationRequested) return; if (_mqttService.MqttClient is not { IsConnected: true }) { await Task.Delay(waitInterval, ct); continue; }
if (ct.IsCancellationRequested) return;
if (!_dataStore.IsWhitelistEmpty()) { await Task.Delay(waitInterval, ct); continue; }
Pageable<WhitelistTask> page = null; if (page?.Data == null || page.Data.Count == 0) { await Task.Delay(10000, ct); } else { _logger.LogDebug("Fetched whitelist tasks from api, count:{Count}", page.Data.Count()); foreach (var task in page.Data) { if (ct.IsCancellationRequested) return; if (task.Action == WhitelistTaskAction.Delete) { await ProcessDeleteAction(task); } else if (task.Action == WhitelistTaskAction.Add) { await ProcessAddAction(task); } } } } }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default); }
1 2 3 4 5 6 7 8 9 10 11 12
| public IEnumerable<MediumMessage> GetMessagesOfNeedRetry() { var result = WhitelistMessages.Values .Where(x => x.Tries < MaxTryCount && (x.StatusName is StatusName.Scheduled or StatusName.Failed || x.StatusName == StatusName.Published && x.PublishAt?.AddMilliseconds(RetryInternal) <= DateTime.Now)) .OrderBy(t => t.AddAt) .GroupBy(t => t.Topic) .Select(t => t.First()); return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| [12:19:54 DBG] Fetched whitelist tasks from api, count:5 [12:19:54 DBG] Publish message EditPerson-951-1494527067538440192 with topic dev/face/2042253 [12:19:54 DBG] Publish message EditPerson-965-1494527067840430080 with topic dev/face/11111 [12:19:55 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:19:55 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:19:55 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:55 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:55 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:55 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:56 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:56 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:56 DBG] Publish message EditPerson-951-1494527067538440192 with topic dev/face/2042253 [12:19:56 DBG] Publish message EditPerson-965-1494527067840430080 with topic dev/face/11111 [12:19:57 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:57 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:57 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:19:57 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:19:57 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:57 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:58 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:58 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:58 DBG] Publish message EditPerson-951-1494527067538440192 with topic dev/face/2042253 [12:19:59 DBG] Publish message EditPerson-965-1494527067840430080 with topic dev/face/11111 [12:19:59 INF] Mqtt message received with topic: dev/face/2042253/heartbeat [12:19:59 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:19:59 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:19:59 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:19:59 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:00 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:20:00 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:20:00 DBG] Skip message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:20:00 DBG] Skip message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:20:01 DBG] Publish message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:20:01 DBG] Publish message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:20:01 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:01 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:20:01 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:02 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:02 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:03 DBG] Publish message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:20:03 DBG] Publish message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:20:03 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:03 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:20:03 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:03 DBG] Cleared 2 messages [12:20:04 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:04 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:05 DBG] Publish message EditPerson-954-1494527067689435136 with topic dev/face/2042253 [12:20:05 DBG] Publish message EditPerson-968-1494527068167585792 with topic dev/face/11111 [12:20:05 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:20:05 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:05 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:06 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:06 DBG] Skip message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:07 DBG] Publish message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:07 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:20:07 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:09 DBG] Publish message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:09 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:20:09 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:09 DBG] Cleared 2 messages [12:20:11 DBG] Publish message EditPerson-957-1494527067722989568 with topic dev/face/2042253 [12:20:11 INF] Mqtt message received with topic: dev/face/2042253/Ack [12:20:11 WRN] Cannot find message handler for topic: dev/face/2042253/Ack [12:20:16 DBG] Cleared 1 messages [12:20:19 INF] Mqtt message received with topic: dev/face/2042253/heartbeat [12:20:19 DBG] Fetched whitelist tasks from api, count:5