[NestJS] ระบบ Queue แบบ Request-Reply ง่ายๆด้วย NestJS
สมมติว่ามีข้อมูลที่ต้องส่งไปให้ Server ก้อนใหญ่สักก้อนนึง เช่น 100,000 ข้อความ (ลองคิดดูว่า ถ้าต้องส่งข้อมูล จำนวน 100,000 ข้อความไปให้ Serverในครั้งเดียว Server ก็คงทำงานไม่ทันแน่ๆ) หรือ Server ถูกกำหนดเอาไว้ว่าจะรับ Message เพียงแค่ 1,000 ข้อความนั้นนะ เพราะทำงานไม่ไหวแน่ๆ
จากโจทย์แบบนี้ เราจะทำอย่างไรได้บ้างในบทความนี้จะนำ NestJs มาแก้ปัญหากัน
วิธีแก้ปัญหาอย่างง่ายๆ และได้ทดลองมา เป็นวิธีที่เรียกว่า Request / Reply Model เดี๋ยวจะกล่าวในหัวข้อถัดไป
วัตถุประสงค์ในเนื้อหานี้ จะเป็นการนำวิธี Implement และ Config ระบบ Queue ด้วย NestJS มาทดลองการใช้งาน ให้ได้ตามโจทย์และความต้องการของผู้เขียน ซึ่งก่อนอื่น คงต้องเล่าเนื้อหาที่เกี่ยวข้องบางส่วนก่อน
ฝากกดโฆษณา Google Ads สัก click เพื่อเป็นกำลังใจแก่ผู้เขียนด้วยนะครับ
Message Queue Concept
โดยหลักการพื้นฐานในเรื่องของ รับ-ส่งข้อความในระบบ Queue (Messaging Queue) จะมีส่วนประกอบ คือ
Queue หรือ Channel - ช่องทางหรือตัวเชื่อมระหว่าง ผู้ส่ง (Producer, Sender) และ ผู้รับ (Consumer, Receiver)
Producer หรือ Sender - ทำหน้าที่ในการส่งข้อความไปยัง Message Queue
Consumer หรือ Receiver ทำหน้าที่รับข้อความจาก Message Queue ซึ่งสามารถรับได้แบบ ถูกเลือกให้รับ หรือ เลือกที่จะรับก็ได้
เมื่อมีการส่งข้อความไปยัง Message Queue ตามภาพด้านบน ข้อความจะถูกจัดเก็บเอาไว้ชั่วคราวเพื่อรอการส่งต่อไปยัง Consumer
ในรูปแบบการส่งในเนื้อหาบทความนี้ จะอยู่ในเรื่องของ Point-To-Point (PTP) จากการสรุปของพี่สมเกียรติ ตามด้านล่างนั้น ค่อนข้างที่จะตรงกับการ Implement ในบทความนี้จึงยกมาอ้างอิงตามด้านล่าง
"Message จะถูก consume จาก consumer เดียวเท่านั้น ถึงแม้จะมี consumer มากกว่า 1 ตัวก็ตาม ทำให้เรามั่นใจได้ว่า จะไม่มี consumer มารับ message ไปทำงานซ้ำแน่นอน การส่งข้อมูลจะตัวกลางซึ่งเราจะเรียกว่า queue (FIFO – First In First Out)"
"Request/Reply model คือเมื่อส่งข้อมูลเข้า queue แล้วจะต้องรอการตอบกลับจาก consomer ด้วยว่าเป็นอย่างไร นั่นทำให้ producer และ consumer ผูกมัดกัน หรืออาจจะเรียกได้ว่า ทำงานแบบ Synchronous"
เมื่อเข้าใจเนื้อหาเบื้องต้นที่พูดถึงในบทความนี้กันแล้วจะมาพูด ข้อดีที่จะช่วยให้เข้าใจว่าทำไมจึงต้องใช้ระบบคิว มันช่วยอะไรคุณได้บ้าง เป็นยกข้อดีของการใช้ระบบคิวมาเนื้อหาของ Queue NestJs
- สามารถเพิ่มงานเข้าไปในระบบคิว เพื่อทำงานพร้อมๆกันได้ และยังเพิ่มคิวใหม่ได้ตลอดเวลา
- มีฟังก์ชันสร้างความน่าเชื่อถือของระบบ ตัวอย่างเช่น มี listening ค่อยฟัง Event ต่างๆ ทั้ง Complete, Error, Fail ให้สามารถจัดการ Task (Job) ต่างๆ ให้ใช้งานได้อย่างน่าเชื่อถือ
NestJs จะใช้ Bull Library ในการทำระบบ Queue Management ซึ่งจะใช้ Redis เป็น Persist Job ให้เก็บข้อความ หรือ ทำ Message Queue งาน ดัง flow ตามภาพด้านล่าง Producer จะส่งข้อความไปให้ Redis ที่ทำหน้าที่เป็น Message Queue แล้วจึงค่อยส่งข้อความไปยัง Consumer เพื่อ Process ต่อไป
จาก Flow เราก็จะมา Code Producer และ Consumer กัน
ติดตั้ง Nest Queue ใน Project
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
Config เรียกใช้ Module ที่ไฟล์ app.module.ts
ตรง forRoot method จะเป็นการ Register bull package ให้เข้าใน application และสามารถ config Properties ในการใช้งานได้
นี้คือตัวอย่างที่ผู้เขียนตั้งค่าเอาไว้ Module อธิบายได้ดังนี้
limiter คือการกำหนด Job ให้ทำงานทีละเท่าไร หลังจากที่ Queue Complete สมมติว่าผู้เขียนอยากให้ queue แต่ละ queue ทำงานให้เสร็จทีละ queue ก่อนที่จะทำ queue ถัดไปก็กำหนดให้ max เท่ากับ 1 หรือถ้าต้องการกำหนดค่าอื่นๆ queue ก็จะทำงานพร้อมๆกันตามจำนวนที่กำหนด
ในส่วนของ backoffStrategies
ใช้ในกรณีที่ queue fail โดยกำหนดชื่อ และ callback
Building Producer
ในส่วนของ Producer ก็เหมือนกับเป็น Controller ตัวหนึ่งหากใครเคยเขียน Rest Api ด้วย Nest มาบ้าง ก็จะคล้ายๆกัน โดยที่เราสามารถที่จะกำหนด Method POST, GET เข้ามา โดยที่เราจะต้อง Injectable Queue Package เอาไว้ที่ controller
constructor(@InjectQueue('audio') private readonly audioQueue: Queue) { }
จำลองว่ามี Message เข้ามาในระบบจำนวน 38 Message ผู้เขียนสร้างเอาไว้เป็น Array
let array1 = [
"message1", "message2", "message3",
"message4", "message5", "message6", "message7", "message8",
"message9", "message10", "message11", "message12", "message13",
"message14", "message15", "message16",
"message17", "message18", "message19", "message20", "message21",
"message22", "message23", "message24",
"message25", "message26", "message27", "message28", "message29",
"message30", "message31", "message32",
"message33", "message34", "message35", "message36", "message37",
"message38"
];
สมมติว่าจากโจทย์ที่ตั้งเอาไว้ตอนต้นว่า กรณีที่ Server กำหนดให้ส่ง Message มาได้แค่ 10 Message เราจะต้องทำอย่างไร วิธีการคือ ทำ Chunk ให้กับชุด Message โดยส่งชุด Array เข้า function นี้ แล้วกำหนด ChunkSize ว่าในแต่ละชุดจะให้มีอย่างละกี่ Array
function chunkArray(array, chunkSize) {
return Array.from(
{ length: Math.ceil(array.length / chunkSize) },
(_, index) => array.slice(index * chunkSize, (index + 1) * chunkSize)
);
}
จากนั้น ก็เรียก Method Add() ของ Queue Instance this.audioQueue.add () โดย Argument Parameter จะมี add('ชื่อของ process', Payload , JobOption) ในตัวอย่างด้านล่างจะกำหนด ชื่อของ Process ไว้ว่า transcode ชื่อตรงนี้จะต้องนำไป ไว้ที่ Consumer ด้วยเพื่อรับ Message ในส่วนของ Payload ก็จะเป็น Function ในการแบ่ง Array ทั้งนี้เพื่อ add เข้าไปใน Queue และสุดท้ายจะเป็น JobOption ผู้เขียนได้กำหนด Setting Job นี้ไว้ว่า attempts : 3 หมายถึง จะทำ Re-Queue ของ JobId เดิม 3 ครั้ง กรณีที่ Queue Fail เราสามารถ กำหนด Fail Job ได้จาก Consumer ส่วน backoff คือ ค่าหน่วงเวลา ของแต่ละครั้งที่ re-queue จะเป็นการกำหนดด้วย Callback ชื่อ jitter สามารถตั้งชื่ออะไรก็ได้ เอาไว้ที่ Module ที่ผู้เขียนเอาไว้ในส่วนของ Module
for (let i = 0; i < chunkArray(array1, chunkSize).length; i++) {
this.audioQueue.add(
'transcode',
chunkArray(array1, chunkSize)[i],
{ // setting JOB
attempts: 3,
backoff: {
type: 'jitter',
}
}
);
}
import { CreateTaskDTO } from './dto/create-task.dto';
import { InjectQueue, OnGlobalQueueCompleted } from '@nestjs/bull';
import { Body, Controller, Post, Get, Injectable } from '@nestjs/common';
import { Job, Queue } from 'bull';
import { job } from 'cron';
@Controller('audio')
export class AudioController {
constructor(@InjectQueue('audio') private readonly audioQueue: Queue) { }
@Get('test')
async transcode() { // @Body() createTaskDTO: CreateTaskDTO
let array1 = [
"message1", "message2", "message3", "message4", "message5",
"message6", "message7", "message8", "message9", "message10",
"message11", "message12", "message13",
"message14", "message15", "message16",
"message17", "message18", "message19",
"message20", "message21",
"message22", "message23", "message24",
"message25", "message26", "message27",
"message28", "message29",
"message30", "message31", "message32",
"message33", "message34", "message35",
"message36", "message37", "message38" ];
let chunkSize = 5;
for (let i = 0; i < chunkArray(array1, chunkSize).length; i++) {
this.audioQueue.add(
'transcode',
chunkArray(array1, chunkSize)[i],
{ // setting JOB
attempts: 3,
backoff: {
type: 'jitter',
}
}
);
}
let waiting = await this.audioQueue.getWaiting();
waiting.forEach(function (message) {
console.log("waiting_id is " + message.id);
console.log("waiting_data is " + message.data);
});
function chunkArray(array, chunkSize) {
return Array.from(
{ length: Math.ceil(array.length / chunkSize) },
(_, index) => array.slice(index * chunkSize, (index + 1) * chunkSize)
);
}
}
}
Building Consumer
ในส่วนของ processorจะต้องกำหนด audio ซึ่งจะตรงกับ @InjectQueue('audio') ใน producer
และต้องกำหนด transcode เหมือนเป็น channel ที่รับจาก producer เช่นกัน จากนั้นก็นำ job ที่ได้จากการ callback มา handle message ว่าจะเอาไปทำอะไรต่อนั้นเอง
@Injectable()
@Processor('audio')
export class AudioProcessor {
constructor(private appService: AppService) { }
@Process('transcode')
async transcode(job: Job<unknown>) {
await this.appService.listCall()
.then(data => {
job.moveToCompleted('done', true);
// RUN PROCESS
this.logger.debug('jobId already Run On Process : ' + job.id);
})
.catch(err => {
job.moveToFailed({ message: 'fail' });
});
}
}
ถ้า อยากทำ Event Listening ด้วยก็สามารถทำได้ โดยเติมเพื่อ loging หรือ event handle
@OnQueueWaiting()
onWaiting(jobId: string) {
this.logger.debug(`OnQueueWaiting : ${jobId}`);
}
คราวนี้ก็มาดูผลลัพท์กัน (อย่าลืมติดตั้ง Redis Server แล้ว start มันด้วยนะ)
จะสังเกตุได้ว่า มีทั้ง queue ที่รออยู่และ queue ที่ active แล้ว
จริงๆแล้ว NestJs ยังสามารถ Handle Queue ได้หลากหลาย สามารถที่จะดูจาก document
REF:
https://docs.nestjs.com/techniques/queues
https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue
https://github.com/OptimalBits/bull/blob/master/PATTERNS.md#custom-backoff-strategy