270 lines
8.3 KiB
JavaScript
270 lines
8.3 KiB
JavaScript
/**
|
||
* 数据库连接池配置
|
||
* @file database-pool.js
|
||
* @description 配置和管理Sequelize数据库连接池
|
||
*/
|
||
const { Sequelize } = require('sequelize');
|
||
const { EventEmitter } = require('events');
|
||
const logger = require('../utils/logger');
|
||
const ormConfig = require('./orm-config');
|
||
|
||
// 从环境变量获取数据库连接参数
|
||
const DB_DIALECT = process.env.DB_DIALECT || 'mysql';
|
||
const DB_STORAGE = process.env.DB_STORAGE || './database.sqlite';
|
||
const DB_NAME = process.env.DB_NAME || 'nxxmdata';
|
||
const DB_USER = process.env.DB_USER || 'root';
|
||
const DB_PASSWORD = process.env.DB_PASSWORD || 'aiotAiot123!';
|
||
const DB_HOST = process.env.DB_HOST || '129.211.213.226';
|
||
const DB_PORT = process.env.DB_PORT || 9527;
|
||
|
||
// 数据库连接池事件发射器
|
||
class DatabasePoolEmitter extends EventEmitter {}
|
||
const poolEvents = new DatabasePoolEmitter();
|
||
|
||
// 默认连接池配置
|
||
const DEFAULT_POOL_CONFIG = {
|
||
max: parseInt(process.env.DB_POOL_MAX || '10'), // 最大连接数
|
||
min: parseInt(process.env.DB_POOL_MIN || '2'), // 最小连接数
|
||
acquire: parseInt(process.env.DB_POOL_ACQUIRE || '30000'), // 获取连接超时时间(毫秒)
|
||
idle: parseInt(process.env.DB_POOL_IDLE || '10000'), // 连接空闲多久后释放(毫秒)
|
||
evict: parseInt(process.env.DB_POOL_EVICT || '1000'), // 多久检查一次空闲连接(毫秒)
|
||
};
|
||
|
||
// 创建Sequelize实例
|
||
let sequelize;
|
||
if (DB_DIALECT === 'sqlite') {
|
||
sequelize = new Sequelize({
|
||
dialect: 'sqlite',
|
||
storage: DB_STORAGE,
|
||
logging: (msg) => logger.debug(msg),
|
||
benchmark: process.env.NODE_ENV !== 'production',
|
||
pool: DEFAULT_POOL_CONFIG,
|
||
define: ormConfig.defaultModelOptions
|
||
});
|
||
} else {
|
||
sequelize = new Sequelize(DB_NAME, DB_USER, DB_PASSWORD, {
|
||
host: DB_HOST,
|
||
port: DB_PORT,
|
||
dialect: DB_DIALECT,
|
||
logging: (msg) => logger.debug(msg),
|
||
benchmark: process.env.NODE_ENV !== 'production',
|
||
pool: DEFAULT_POOL_CONFIG,
|
||
define: ormConfig.defaultModelOptions,
|
||
dialectOptions: {
|
||
charset: 'utf8mb4',
|
||
supportBigNumbers: true,
|
||
bigNumberStrings: true,
|
||
dateStrings: true,
|
||
multipleStatements: process.env.DB_MULTIPLE_STATEMENTS === 'true'
|
||
},
|
||
timezone: '+08:00'
|
||
});
|
||
}
|
||
|
||
// 监听连接池事件 - 使用Sequelize实例的hooks
|
||
sequelize.addHook('afterConnect', (connection, config) => {
|
||
logger.info(`数据库连接已建立`);
|
||
poolEvents.emit('connect', connection);
|
||
});
|
||
|
||
sequelize.addHook('beforeDisconnect', (connection) => {
|
||
logger.info(`数据库连接即将断开`);
|
||
poolEvents.emit('disconnect', connection);
|
||
});
|
||
|
||
// 注意:acquire和release事件在新版Sequelize中需要通过其他方式监听
|
||
// 这里我们使用定时器来监控连接池状态
|
||
setInterval(() => {
|
||
if (sequelize.connectionManager && sequelize.connectionManager.pool) {
|
||
const pool = sequelize.connectionManager.pool;
|
||
poolEvents.emit('poolStatus', {
|
||
size: pool.size || 0,
|
||
available: pool.available || 0,
|
||
using: pool.using || 0,
|
||
waiting: pool.waiting || 0
|
||
});
|
||
}
|
||
}, 30000); // 每30秒检查一次连接池状态
|
||
|
||
// 测试数据库连接
|
||
async function testConnection() {
|
||
try {
|
||
await sequelize.authenticate();
|
||
logger.info('数据库连接测试成功');
|
||
poolEvents.emit('connectionSuccess');
|
||
return { success: true, message: '数据库连接测试成功' };
|
||
} catch (error) {
|
||
logger.error('数据库连接测试失败:', error);
|
||
poolEvents.emit('connectionError', error);
|
||
return { success: false, message: error.message };
|
||
}
|
||
}
|
||
|
||
// 获取连接池状态
|
||
async function getPoolStatus() {
|
||
try {
|
||
const pool = sequelize.connectionManager.pool;
|
||
if (!pool) {
|
||
return { error: '连接池未初始化' };
|
||
}
|
||
|
||
// 获取连接池统计信息
|
||
const status = {
|
||
all: pool.size, // 所有连接数
|
||
idle: pool.idleCount, // 空闲连接数
|
||
used: pool.size - pool.idleCount, // 使用中的连接数
|
||
waiting: pool.pending, // 等待连接的请求数
|
||
max: pool.options.max, // 最大连接数
|
||
min: pool.options.min, // 最小连接数
|
||
acquire: pool.options.acquire, // 获取连接超时时间
|
||
idle: pool.options.idle, // 空闲超时时间
|
||
created: new Date().toISOString(), // 状态创建时间
|
||
utilization: (pool.size > 0) ? ((pool.size - pool.idleCount) / pool.size) * 100 : 0 // 利用率
|
||
};
|
||
|
||
poolEvents.emit('poolStatus', status);
|
||
return status;
|
||
} catch (error) {
|
||
logger.error('获取连接池状态失败:', error);
|
||
return { error: error.message };
|
||
}
|
||
}
|
||
|
||
// 监控连接池
|
||
async function monitorPool(interval = 60000) {
|
||
try {
|
||
const status = await getPoolStatus();
|
||
logger.debug('连接池状态:', status);
|
||
|
||
// 检查连接池利用率
|
||
if (status.utilization > 80) {
|
||
logger.warn(`连接池利用率过高: ${status.utilization.toFixed(2)}%`);
|
||
poolEvents.emit('highUtilization', status);
|
||
}
|
||
|
||
// 检查等待连接的请求数
|
||
if (status.waiting > 5) {
|
||
logger.warn(`连接池等待请求过多: ${status.waiting}`);
|
||
poolEvents.emit('highWaiting', status);
|
||
}
|
||
|
||
return status;
|
||
} catch (error) {
|
||
logger.error('监控连接池失败:', error);
|
||
return { error: error.message };
|
||
}
|
||
}
|
||
|
||
// 关闭连接池
|
||
async function closePool() {
|
||
try {
|
||
await sequelize.close();
|
||
logger.info('数据库连接池已关闭');
|
||
poolEvents.emit('poolClosed');
|
||
return { success: true, message: '数据库连接池已关闭' };
|
||
} catch (error) {
|
||
logger.error('关闭数据库连接池失败:', error);
|
||
return { success: false, message: error.message };
|
||
}
|
||
}
|
||
|
||
// 重置连接池
|
||
async function resetPool() {
|
||
try {
|
||
await closePool();
|
||
|
||
// 重新初始化连接池
|
||
sequelize.connectionManager.initPools();
|
||
|
||
// 测试新的连接池
|
||
const testResult = await testConnection();
|
||
|
||
if (testResult.success) {
|
||
logger.info('数据库连接池已重置');
|
||
poolEvents.emit('poolReset');
|
||
return { success: true, message: '数据库连接池已重置' };
|
||
} else {
|
||
throw new Error(testResult.message);
|
||
}
|
||
} catch (error) {
|
||
logger.error('重置数据库连接池失败:', error);
|
||
poolEvents.emit('poolResetError', error);
|
||
return { success: false, message: error.message };
|
||
}
|
||
}
|
||
|
||
// 优化连接池配置
|
||
async function optimizePool(config = {}) {
|
||
try {
|
||
// 获取当前状态
|
||
const currentStatus = await getPoolStatus();
|
||
|
||
// 计算新的配置
|
||
const newConfig = {
|
||
max: config.max || Math.max(currentStatus.max, Math.ceil(currentStatus.used * 1.5)),
|
||
min: config.min || Math.min(currentStatus.min, Math.floor(currentStatus.used * 0.5)),
|
||
acquire: config.acquire || currentStatus.acquire,
|
||
idle: config.idle || currentStatus.idle
|
||
};
|
||
|
||
// 确保最小连接数不小于1
|
||
newConfig.min = Math.max(newConfig.min, 1);
|
||
|
||
// 确保最大连接数不小于最小连接数
|
||
newConfig.max = Math.max(newConfig.max, newConfig.min);
|
||
|
||
// 应用新配置
|
||
await closePool();
|
||
|
||
// 更新连接池配置
|
||
sequelize.options.pool = newConfig;
|
||
|
||
// 重新初始化连接池
|
||
sequelize.connectionManager.initPools();
|
||
|
||
// 测试新的连接池
|
||
const testResult = await testConnection();
|
||
|
||
if (testResult.success) {
|
||
logger.info('数据库连接池已优化:', newConfig);
|
||
poolEvents.emit('poolOptimized', newConfig);
|
||
return { success: true, message: '数据库连接池已优化', config: newConfig };
|
||
} else {
|
||
throw new Error(testResult.message);
|
||
}
|
||
} catch (error) {
|
||
logger.error('优化数据库连接池失败:', error);
|
||
poolEvents.emit('poolOptimizationError', error);
|
||
return { success: false, message: error.message };
|
||
}
|
||
}
|
||
|
||
// 获取数据库表列表
|
||
async function getTablesList() {
|
||
try {
|
||
const [results] = await sequelize.query(
|
||
`SELECT TABLE_NAME FROM information_schema.TABLES
|
||
WHERE TABLE_SCHEMA = ?
|
||
ORDER BY TABLE_NAME`,
|
||
{ replacements: [DB_NAME] }
|
||
);
|
||
|
||
return results.map(row => row.TABLE_NAME);
|
||
} catch (error) {
|
||
logger.error('获取数据库表列表失败:', error);
|
||
return [];
|
||
}
|
||
}
|
||
|
||
// 导出模块
|
||
module.exports = {
|
||
sequelize,
|
||
testConnection,
|
||
getPoolStatus,
|
||
monitorPool,
|
||
closePool,
|
||
resetPool,
|
||
optimizePool,
|
||
getTablesList,
|
||
events: poolEvents
|
||
}; |