вівторок, 4 жовтня 2011 р.

Приклад роботи Delayed_job з DataMapper у Sinatra

Delayed_job (або DJ) бібліотека асинхронної черги фонових завдань написана на Ruby. Може знадобитись для таких типових задач як:
  • поштова розсилка
  • зміна розмірів зображень
  • конвертування аудіо/відео
  • HTTP завантаження
  • оновлення інформації
  • перевірка спаму
Спочатку написана Tobias Lütke як плагін для веб-фреймворка Rails з Active Record. На даний час підтримується компанією Collective Idea, і дозволяє використовувати ряд інших бекендів для збереження черги завдань.

 

Інсталяція

У цій статті Delayed_Job буде використовуватись разом з DataMapper.
Встановлюємо наступні gem'и:
$ gem install delayed_job delayed_job_data_mapper_ste

Вступ в Delayed_job

Бібліотека Delayed_job розвивається навколо таблиці delayed_jobs, яка має наступну структуру:
id
priorityДозволяє завданню перейти до початку черги
attemptsЗабезпечує повтори. По замовчуванню 0
handlerСеріалізований у YAML об'єкт, який буде оброблений
last_errorПричина останньої помилки
run_atКоли виконати. Може бути Time.zone.now для негайного виконання, або у майбутньому
locked_atВстановлюється, якщо клієнт працює на цьому об'єкті, тобто заблокований
failed_atВстановлюється, якщо всі спроби закінчилися невдачею (по замовчуванню, запис буде видалений)
locked_byХто працює на цьому об'єкті (якщо заблокований)

У разі невдачі, завдання повторюється кожні [5 секунд + N ** 4], де N - це число повторів.

По замовчуванню Worker.max_attempts рівне 25. Після цього завдання видаляється(по замовчуванню), або залишається у базі зі значенням у полі failed_at. Із встановленими по замовчуванню 25 спробами, останній повтор буде через 20 днів з останнім інтервалом майже 100 годин.

Worker.max_run_time по замовчуванню становить 4 години. Якщо завдання вимагає більше часу, доречним буде встановити це значення довшим.
Невдалі завдання по замовчуванню будуть видалятися (успішні видаляються завжди). Якщо бажаєте зберігати інформацію про невдалі завдання встановіть Delayed::Worker.destroy_failed_jobs = false. Невдалі завдання будуть позначені у полі failed_at.

По замовчуванню всі завдання плануються з пріоритетом 0, який є головним. Ви можете це змінити, встановивши Delayed::Worker.default_priority. Нижчі значення мають вищий пріоритет.

Можна відкласти виконання завдань з метою тестування, встановивши Delayed::Worker.delay_jobs = false, для виконання всіх завдань в реальному часі.

Ось приклади зміни параметрів завдань:
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 5 * 360
Delayed::Worker.delay_jobs = false

 

Розбір коду

Ось ми і підійшли до найцікавішого. Практично весь отриманий код легко зрозумілий і знайти його можна на GitHub.

Для початку розберемо структуру проекту, вона дуже проста:
/
|-  Rakefile
|-  app.rb
|-  initializer.rb

Файл initializer.rb містить в собі моделі, конфігурацію бази даних і все що стосується роботи фоновими завданнями.
app.rb містить весь код для Sinatra.
Rakefile містить виконуваний код Ruby для фонових завдань.

Для прикладу роботи з Delayed_job, ми створимо простий Sinatra-додаток.
Отже, почнемо зі структури URL'ів.
Головна сторінка ('/') по методу GET видає список фраз та їх переклад. Вона ж і займається введенням нових фраз по POST'у.
Сторінка '/translation' записує текст у базу даних.

# app.rb
require 'sinatra'
require 'slim'
require_relative 'initializer'

get '/' do
  @translations = Translation.all
  slim :index
end

post '/translation' do
  translation = Translation.create(:input => params[:input])
  translation.save
  redirect '/'
end

Нижче поміщаємо код шаблону на Slim:
# app.rb
__END__

@@index
h1 Translations
- @translations.each do |translation|
  ul
    li
      span= translation.input
      span →
      span= translation.output || '...pending...'

h2 New Translation
form method='post' action='/translation'
  ul
    li#input
      label for='translation_input' Input:
      br
      input type='text' id='translation_input' name='input'

  input type='submit' value='New'

Тепер подивимося, що міститься у файлі initializer.rb.
Наступний рядок вказує DJ використовувати DataMapper для збереження черги завдань:
Delayed::Worker.backend = :data_mapper

Підключаємо базу даних SQLite3:
DataMapper.setup(:default, "sqlite3://#{Dir.pwd}/db.sqlite3")

Настав час написати клас моделі:
class Translation
  include DataMapper::Resource

  property :id,     Serial
  property :input,  Text
  property :output, Text
end

Таблиця delayed_jobs для збереження черги завдань створюватиметься автоматично під час запуску додатку.

Прийшов час поговорити про винуватця бенкету.
Завданням(Job) є об'єкт Ruby, який має метод perform. Тобто будь-який об'єкт, який відповідає на perform може бути доданий у таблицю delayed_jobs. Об'єкти серіалізуються у YAML, так що в майбутньому вони можуть бути відновлені до початкового об'єкта.

Відразу після того як користувач натисне на кнопку "New", дані з параметру input запишуться в однойменне поле в таблиці translations. Після цього відбувається постановка об'єкта у чергу завдань для подальшого опрацювання методом Translation#perform.
class Translations
  # ...
  after :create do
    Delayed::Job.enqueue(self)
  end

  def perform
    puts 'Perform some job...'
    self.update( :output => input.reverse )
  end
end


Також можна визначити ряд інших методів для класу моделі, які будуть викликатися на різних стадіях процесу.
  • enqueue(job)
  • perform
  • before(job)
  • after(job)
  • success(job)
  • error(job, exception)
  • failure

В процесі роботи з Delayed_job я зіткнувся з проблемою десеріалізації об'єктів DataMapper. Як виявилось проблема відома і після мого запиту виправлений код був доданий в основну гілку delayed_job_data_mapper.

Використовуємо monkey patching в initializer.rb:
module DataMapper
  module Resource
    def self.yaml_new(klass, tag, val)
      klass.get(val['id'])
    end
  end
end

Для працездатності нашого коду слід також реалізувати метод sum з Active Support:
class Array; def sum; self.inject(:+); end; end

Файл під ім'ям Rakefile, розташований в кореневій директорії, містить наступне:
require_relative 'initializer'

namespace :dj do

  desc "Start DelayedJob worker"
  task :work do
    DataMapper::Logger.new($stdout, :info)
    Delayed::Worker.backend = :data_mapper
    Delayed::Worker.backend.auto_upgrade!
    Delayed::Worker.new.start
  end
end

Утиліта rake дозволяє створювати так звані завдання, які виконуються командою rake ІМЯ_ЗАВДАННЯ.

 

Запуск

Щоб запустити, виконайте наступні дії:
  1. В одному терміналі виконати: rake dj:work
  2. В іншому: ruby server.rb
  3. Перейти у переглядачі за адресою http://localhost:4567 і якщо все зроблено правильно, то насолоджуватися результатом.

 

Посилання

Немає коментарів: