21CTO导读:各位朋友们,越来越多的应用开始倾向于技术栈的简洁,特别是PostgreSQL已经将众多NoSQL技术特性融合一身的情况下。有类似的场景可以参考本文。
// Cache API responsesawaitredis.set(`user:${id}`,JSON.stringify(user),'EX',3600);
// Real-time notificationsredis.publish('notifications',JSON.stringify({userId,message}));
// Using Bull/BullMQqueue.add('send-email',{to,subject,body});
// 更新数据库awaitdb.query('UPDATE users SET name = $1 WHERE id = $2',[name,id]);// 删除无效缓存awaitredis.del(`user:${id}`);// ⚠️ 处理Redis宕机逻辑// ⚠️处理修复失败逻辑// 缓存与数据同步逻辑
await redis.set('session:abc123',JSON.stringify(sessionData),'EX',3600);CREATE UNLOGGED TABLE cache(key TEXT PRIMARY KEY,value JSONB NOT NULL,expires_at TIMESTAMPTZ NOT NULL);CREATE INDEX idx_cache_expires ON cache(expires_at));
INSERT INTO cache(key,value,expires_at)VALUES($1,$2,NOW()+INTERVAL'1 hour')ON CONFLICT(key)DO UPDATE SET value = EXCLUDED.value,expires_at = EXCLUDED.expires_at;
SELECT value FROM cacheWHERE key = $1ANDexpires_at>NOW();DELETE FROM cache WHEREexpires_at<NOW();// Publisherredis.publish('notifications',JSON.stringify({userId:123,msg:'Hello'}));// Subscriberredis.subscribe('notifications');redis.on('message',(channel,message)=>{console.log(message);});
-- PublisherNOTIFY notifications,'{"userId": 123, "msg": "Hello"}';// Subscriber (Node.js with pg)const client=new Client({connectionString:process.env.DATABASE_URL});await client.connect();await client.query('LISTEN notifications');client.on('notification',(msg)=>{const payload=JSON.parse(msg.payload);console.log(payload);});
现实世界的例子
// When new log arrivesawait db.query('INSERT INTO logs ...');await redis.publish('logs:new',JSON.stringify(log));// Frontend listensredis.subscribe('logs:new');
CREATE FUNCTION notify_new_log()RETURN STRIGGERAS$BEGIN PERFORM pg_notify('logs_new',row_to_json(NEW)::text);RETURN NEW;END;$LANGUAGE plpgsql;CREATET RIGGER log_inserted AFTER INSERT ON logsFOREACH ROW EXECUTE FUNCTION notify_new_log();
// Frontend (via SSE)app.get('/logs/stream',async(req,res)=>{constclient=awaitpool.connect();res.write Head(200,{'Content-Type':'text/event-stream','Cache-Control':'no-cache',});await client.query('LISTEN logs_new');client.on('notification',(msg)=>{res.write(`data:${msg.payload}\n\n`);});});
queue.add('send-email',{to,subject,body});queue.process('send-email',async(job)=>{await sendEmaill(job.data);});
CREATE TABLE jobs(id BIG SERIAL PRIMARY KEY,queue TEXT NOT NULL,payload JSONB NOT NULL,attempts INT DEFAULT 0,max_attempts INT DEFAULT 3,scheduled_at TIMESTAMPTZ DEFAULT NOW(),created_at TIMESTAMPTZ DEFAULT NOW());CREATE INDEX idx_jobs_queue ON jobs(queue,scheduled_at)WHERE attempts
INSERT INTOjobs(queue,payload)VALUES('send-email','{"to": "user@example.com", "subject": "Hi"}');
WITH next_job AS(SELECT id FROM jobsWHERE queue=$1AND attemptsAND scheduled_at<=NOW()ORDER BY scheduled_atLIMIT1FOR UPDATE SKIPLOCKED)UPDATE jobsSET attempts=attempts+1FROM next_jobWHERE jobs.id=next_job.idRETURN ING*;
const key=`ratelimit:${userId}`;const count=await redis.incr(key);if (count===1){await redis.expire(key,60);// 60 seconds}if (count>100){throw new Error('Rate limit exceeded');}
CREATE TABLE rate_limits(user_id INT PRIMARY KEY,request_count INT DEFAULT 0,window_start TIMESTAMPTZ DEFAULT NOW());-- Check and incrementWITH current AS(SELECTrequest_count,CASEWHEN window_start
CREATE TABLE api_requests(user_id INT NOT NULL,created_at TIMESTAMPTZ DEFAULT NOW());-- Check rate limitSELECT COUNT(*) FROM api_requests WHERE user_id=$1 AND created_at>NOW()-INTERVAL'1 minute';-- If under limit, insertINSERT INTO api_requests(user_id) VALUES($1);-- Cleanup old requests periodicallyDELETE FROM api_requests WHERE created_at<span minutes';<="" section="">
await redis.set(`session:${sessionId}`,JSON.stringify(sessionData),'EX',86400);CREATE TABLE sessions(id TEXT PRIMARYKEY,data JSONB NOTNULL,expires_at TIMESTAMPTZ NOT NULL);CREATE INDEX idx_sessions_expires ON sessions(expires_at);--Insert/UpdateINSERTINTOsessions(id,data,expires_at)VALUES($1,$2,NOW()+INTERVAL'24 hours')ON CONFLICT(id) DO UPDATE SETdata=EXCLUDED.data,expires_at=EXCLUDED.expires_at;-- ReadSELECT data FROM sessions WHERE id=$1 AND expires_at>NOW();
-- Find all sessions for a specific userSELECT * FROM sessionsWHERE data->>'userId'='123';-- Find sessions with specific roleSELECT * FROM sessionsWHEREdata->'user'->>'role'='admin';
| Redis | PostgreSQL | 不同之处 | |
|---|---|---|---|
| 缓存集 | 0.05毫秒 | 0.08毫秒 | 速度降低 60% |
| 缓存获取 | 0.04毫秒 | 0.06毫秒 | 速度降低 50% |
| 发布/订阅 | 1.2毫秒 | 3.1毫秒 | 速度降低 158% |
| 队列推送 | 0.08毫秒 | 0.15毫秒 | 速度降低 87% |
| 队列弹出 | 0.12毫秒 | 0.31毫秒 | 速度降低 158% |
await db.query('INSERT INTO posts ...');// 2msawait redis.del('posts:latest');// 1ms (network hop)await redis.publish('posts:new',data);// 1ms (network hop)// Total: ~4ms
BEGIN;INSERT INTO posts...;-- 2msDELETE FROM cache WHERE key='posts:latest';-- 0.1ms (same connection)NOTIFY posts_new,'...';-- 0.1ms (same connection)COMMIT;-- Total: ~2.2ms
-- Leaderboard in Postgres (slower)SELECT user_id,scoreFROM leaderboardORDER BY score DESCLIMIT 10;-- vs RedisZREV RANGE leaderboard 09 WITH SCORES
// Write to bothawait redis.set(key,value);await pg.query('INSERT INTO cache ...');// Read from Redis (still primary)le tdata=await redis.get(key);
// Try Postgres firstlet data=await pg.query('SELECT value FROM cache WHERE key = $1',[key]);// Fallback to Redisif (!data){data=await redis.get(key);}
// Only write to Postgresawaitpg.query('INSERT INTO cache ...');
你的体验如何?欢迎在评论区分享基准测试结果。
作者:行动中的大雄
本篇文章为 @ 场长 创作并授权 21CTO 发布,未经许可,请勿转载。
内容授权事宜请您联系 webmaster@21cto.com或关注 21CTO 微信公众号。
该文观点仅代表作者本人,21CTO 平台仅提供信息存储空间服务。
请扫描二维码,使用微信支付哦。