/** * 实时数据推送服务 * @file realtimeService.js * @description 定期检查数据变化并通过WebSocket推送给客户端 */ const cron = require('node-cron'); const { Device, Alert, Animal, Farm } = require('../models'); const { sequelize } = require('../config/database-simple'); const webSocketManager = require('../utils/websocket'); const notificationService = require('./notificationService'); const logger = require('../utils/logger'); const { Op } = require('sequelize'); class RealtimeService { constructor() { this.isRunning = false; this.lastUpdateTimes = { devices: null, alerts: null, animals: null, stats: null }; this.updateInterval = 30; // 30秒更新间隔,符合文档要求 } /** * 启动实时数据推送服务 */ start() { if (this.isRunning) { logger.warn('实时数据推送服务已在运行中'); return; } this.isRunning = true; // 设备状态监控 - 每30秒检查一次 cron.schedule(`*/${this.updateInterval} * * * * *`, () => { this.checkDeviceUpdates(); }); // 预警监控 - 每10秒检查一次(预警更紧急) cron.schedule('*/10 * * * * *', () => { this.checkAlertUpdates(); }); // 动物健康状态监控 - 每60秒检查一次 cron.schedule('*/60 * * * * *', () => { this.checkAnimalUpdates(); }); // 系统统计数据更新 - 每2分钟更新一次 cron.schedule('*/120 * * * * *', () => { this.updateSystemStats(); }); logger.info('实时数据推送服务已启动'); console.log(`实时数据推送服务已启动,更新间隔: ${this.updateInterval}秒`); } /** * 停止实时数据推送服务 */ stop() { this.isRunning = false; logger.info('实时数据推送服务已停止'); } /** * 检查设备状态更新 */ async checkDeviceUpdates() { try { const lastCheck = this.lastUpdateTimes.devices || new Date(Date.now() - 60000); // 默认检查最近1分钟 const updatedDevices = await Device.findAll({ where: { updated_at: { [Op.gt]: lastCheck } }, include: [{ model: Farm, as: 'farm', attributes: ['id', 'name'] }], order: [['updated_at', 'DESC']] }); if (updatedDevices.length > 0) { logger.info(`检测到 ${updatedDevices.length} 个设备状态更新`); // 为每个更新的设备推送数据 for (const device of updatedDevices) { webSocketManager.broadcastDeviceUpdate({ id: device.id, name: device.name, type: device.type, status: device.status, farm_id: device.farm_id, farm_name: device.farm?.name, last_maintenance: device.last_maintenance, metrics: device.metrics, updated_at: device.updated_at }); } this.lastUpdateTimes.devices = new Date(); } } catch (error) { logger.error('检查设备更新失败:', error); } } /** * 检查预警更新 */ async checkAlertUpdates() { try { const lastCheck = this.lastUpdateTimes.alerts || new Date(Date.now() - 30000); // 默认检查最近30秒 const newAlerts = await Alert.findAll({ where: { created_at: { [Op.gt]: lastCheck } }, include: [ { model: Farm, as: 'farm', attributes: ['id', 'name', 'contact', 'phone'] }, { model: Device, as: 'device', attributes: ['id', 'name', 'type'] } ], order: [['created_at', 'DESC']] }); if (newAlerts.length > 0) { logger.info(`检测到 ${newAlerts.length} 个新预警`); // 推送新预警 for (const alert of newAlerts) { webSocketManager.broadcastAlert({ id: alert.id, type: alert.type, level: alert.level, message: alert.message, status: alert.status, farm_id: alert.farm_id, farm_name: alert.farm?.name, device_id: alert.device_id, device_name: alert.device?.name, created_at: alert.created_at }); // 如果是高级或紧急预警,立即发送通知 if (alert.level === 'high' || alert.level === 'critical') { await this.sendUrgentNotification(alert); } } this.lastUpdateTimes.alerts = new Date(); } } catch (error) { logger.error('检查预警更新失败:', error); } } /** * 检查动物健康状态更新 */ async checkAnimalUpdates() { try { const lastCheck = this.lastUpdateTimes.animals || new Date(Date.now() - 120000); // 默认检查最近2分钟 const updatedAnimals = await Animal.findAll({ where: { updated_at: { [Op.gt]: lastCheck } }, include: [{ model: Farm, as: 'farm', attributes: ['id', 'name'] }], order: [['updated_at', 'DESC']] }); if (updatedAnimals.length > 0) { logger.info(`检测到 ${updatedAnimals.length} 个动物健康状态更新`); for (const animal of updatedAnimals) { webSocketManager.broadcastAnimalUpdate({ id: animal.id, type: animal.type, count: animal.count, health_status: animal.health_status, farm_id: animal.farm_id, farm_name: animal.farm?.name, last_inspection: animal.last_inspection, updated_at: animal.updated_at }); } this.lastUpdateTimes.animals = new Date(); } } catch (error) { logger.error('检查动物更新失败:', error); } } /** * 更新系统统计数据 */ async updateSystemStats() { try { const stats = await this.getSystemStats(); webSocketManager.broadcastStatsUpdate(stats); this.lastUpdateTimes.stats = new Date(); logger.info('系统统计数据已推送'); } catch (error) { logger.error('更新系统统计失败:', error); } } /** * 获取系统统计数据 * @returns {Promise} 统计数据 */ async getSystemStats() { try { const [farmCount, deviceCount, animalCount, alertCount] = await Promise.all([ Farm.count(), Device.count(), Animal.sum('count'), Alert.count({ where: { status: 'active' } }) ]); const deviceStatusStats = await Device.findAll({ attributes: [ 'status', [sequelize.fn('COUNT', sequelize.col('status')), 'count'] ], group: ['status'] }); const alertLevelStats = await Alert.findAll({ where: { status: 'active' }, attributes: [ 'level', [sequelize.fn('COUNT', sequelize.col('level')), 'count'] ], group: ['level'] }); return { farmCount: farmCount || 0, deviceCount: deviceCount || 0, animalCount: animalCount || 0, alertCount: alertCount || 0, deviceStatus: deviceStatusStats.reduce((acc, item) => { acc[item.status] = parseInt(item.dataValues.count); return acc; }, {}), alertLevels: alertLevelStats.reduce((acc, item) => { acc[item.level] = parseInt(item.dataValues.count); return acc; }, {}), timestamp: new Date() }; } catch (error) { logger.error('获取系统统计数据失败:', error); return { error: '获取统计数据失败', timestamp: new Date() }; } } /** * 发送紧急预警通知 * @param {Object} alert 预警对象 */ async sendUrgentNotification(alert) { try { logger.warn(`紧急预警: ${alert.message} (级别: ${alert.level})`); // 发送实时WebSocket通知给管理员 webSocketManager.broadcastAlert({ id: alert.id, type: alert.type, level: alert.level, message: alert.message, farm_id: alert.farm_id, farm_name: alert.farm?.name, created_at: alert.created_at }); // 发送邮件/短信通知 const isUrgent = alert.level === 'critical' || alert.level === 'high'; await notificationService.sendAlertNotification(alert, [], { urgent: isUrgent, includeSMS: alert.level === 'critical', // 仅紧急预警发送短信 maxResponseTime: 300000 // 5分钟响应时间 }); logger.info(`预警通知已发送: 预警ID ${alert.id}, 紧急程度: ${isUrgent}`); } catch (error) { logger.error('发送紧急预警通知失败:', error); } } /** * 模拟设备数据变化(用于演示) */ async simulateDeviceChange(deviceId) { try { const device = await Device.findByPk(deviceId); if (!device) return; // 随机改变设备状态 const statuses = ['online', 'offline', 'maintenance']; const randomStatus = statuses[Math.floor(Math.random() * statuses.length)]; await device.update({ status: randomStatus, metrics: { temperature: Math.random() * 10 + 20, // 20-30度 humidity: Math.random() * 20 + 50, // 50-70% lastUpdate: new Date() } }); logger.info(`模拟设备 ${deviceId} 状态变化为: ${randomStatus}`); } catch (error) { logger.error('模拟设备变化失败:', error); } } /** * 获取服务状态 * @returns {Object} 服务状态 */ getStatus() { return { isRunning: this.isRunning, lastUpdateTimes: this.lastUpdateTimes, updateInterval: this.updateInterval, connectedClients: webSocketManager.getConnectionStats() }; } } // 创建单例实例 const realtimeService = new RealtimeService(); module.exports = realtimeService;