Mongo DB를 사용하여 예약 작업 구현하기

등록할 예약 작업의 성격

  • 특정 일시에 실행되어야 하는 작업
  • ex) 2015.11.31 09:00 OO에게 메일 발송

 

스케쥴 서버의 조건

  • 예약 작업이 많아 졌을 경우, scale-out 확장이 가능해야 함

예약 작업 저장을 위한 스토리지로 Mongo DB를 선택했으며, Collection 구조는 아래와 같이 정의하였다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// collection
var reservedTask{
    _id: xxx, //taskId
    cmd: OOOO, // 실행할 작업
    contents: {name:'aasfdsadf'} // 작업에 필요한 내용 Object Type
    reserved: YYYY-MM-DDTHH:MM:SS, // 실행될 시간
    registered: YYYY-MM-DDTHH:MM:SS, // 등록한 시간
    status: REQUEST // 작업 실행 상태
}
// task 등록
db.reservedTasks.save({
    cmd: 'sendMail',
    registered: new Date(),
    reserved: new Date('2015-11-13T09:00:00'),
    contents: {
        to: 'abc@def.com',
        body: '.......'
    },
    status: 'REQUEST'
});

구현할 기능은 아래와 같다

  1. 예약 작업 등록
  2. 실행할 작업을 DB로 부터 읽어 옴.
  3. 실행할 작업이 있을 경우, 상태를 실행으로 변경하고 예약 작업을 실행
  4. 실행을 완료한 작업은 완료로 처리

여러 서버에서 스캐쥴러가 동작할 경우 2, 3, 4번의 과정에서 동일한 작업이 여러번 실행되는 문제가 발생할 수 있다. 만약 서버 A가 2, 3번 과정을 실행중일때, 서버 B가 2번 과정을 실행하게 되면, 동일한 작업이 두번 실행되게 된다. 이러한 문제는 MongoDB로 구현할때 findAndModify 연산을 이용하여 해결할 수 있다.

findAndModify 연산을 이용한 예약 작업 읽기

1
2
3
4
5
db.reservedTasks.findAndModify({
    query: { reserved: new Date('2015-11-13T09:00:00'), status: 'REQUEST' },
    update: { $set: { status: 'RUNNING' } },
    new: true
});

위의 쿼리를 수행하게 되면 읽기와 쓰기가 동시에 수행이 되어, 여러 서버에서 동시에 수행이 되더라도 동일한 작업이 여러번 실행되는 것을 막을 수 있다. 또한 이후 작업이 결과에 따라 status 값을 조정함으로써 완료/실패 처리가 가능하다.

문제점
findAndModify를 사용할 경우 예약된 작업을 하나씩 실행해야 한다. 만약 1000개의 예약된 작업이 있을 경우 1000번 위의 과정을 수행해야 한다. 이러한 제한은 findAndModify가 하나의 document에 대해서만 update가 가능하기 때문이다. 이를 위해서는 다른 해결책이 필요하다.

 

이중 확인을 사용한 방법(transId 추가)

  1. 실행할 작업을 DB로 부터 읽어 옴.
  2. DB로 부터 읽어 온 작업을 업데이트(상태: 실행중, transId:’transactionNum’)
  3. transId가 ‘transactionNum’인 작업을 읽음
  4. 작업 완료후 처리한 작업을 완료로 처리
1
2
3
4
5
6
7
8
9
10
11
db.reservedTasks.find({ reserved: new Date('2015-11-13T09:00:00'),
status: 'REQUEST'
}).limit(100) // ID 읽음.
db.reservedTasks.update( {_id: {$in: [/* 앞에서 읽어온 id 리스트 */]},
        status: 'REQUEST'},// 읽은 ID중 아직까지 REQUEST 상태인 것만,
    {$set: {status: 'RUNNING',
                transId: 'transactionNum'}},
// transaction Num을 부여하고, 실행중 상태로
    {multi: true}
)
db.reservedTaks.find({transId: 'transactionNum'})// 실제로 실행할 작업을 다시 읽음

위 과정중 1 번 쿼리의 필요성에 대해 고민해 볼 필요가 있다. 1번 과정은 실제로 최대 N개로 한번의 작업 횟수를 제한하는데 필요하다. 이러한 제한이 필요 없을 경우 1번 과정은 생략 가능하다


+ Recent posts