How to Integrate Refinery + Rails 3.2 into Your Existing Rails App

How to Integrate Refinery + Rails 3.2 into Your Existing Rails App

Why?

Refinery CMS provides a polished CMS to integrate seamlessly into your existing rails application.

How?

Refinery breaks its components into individual engines and we can pick and choose which engines we want in our application. Refinery’s authentication engine uses devise however we want to have complete control over Devise in our Rails application.

Read more

Using Amazon’s Simple Workflow Service from Rails

Using Amazon’s Simple Workflow Service from Rails

One of the projects I’m working on involves scraping information from Amazon product listings. Currently it uses delayed job, but there’s an issue with long-running processes dying off.
To deal with this issue, we are considering a switch to Amazon’s Simple Workflow Service. It moves the task queue to Amazon’s servers and client programs, then polls for tasks.
For the purpose of this article, I’m going to do a simplified version of the application that just pulls down prices. The data structures for this are pretty straightforward:

  • rails new swf_scraper
  • rails generate scaffold Product asin:string
  • rails generate scaffold Record price:float product_id:integer

Connect the records to the products by adding has_many and belongs_to to the product and record models respectively.

Accessing the workflow service requires adding aws-sdk and aws-flow gems to the Gemfile.

SWF uses workflows to define the order of activity execution. The workflow for this project is:

class ScrapeWorkflow extend AWS::Flow::Workflows
workflow :queue_scrape do
{
:version => "1.1",
:task_list => SWF_WORKFLOW_TASK_LIST,
:execution_start_to_close_timeout => 10 * 60,
}
end

activity_client(:activity){ {:from_class => "ScrapeActivity"} }

def queue_scrape(asin)
scrape_future = Future.new.set
scrape_future = activity.send_async(:scrape, asin)
# wait_for_all(scrape_future)
end
end

There is just one task in this workflow: scrape the asin. The activity is where the actual processing takes place:

class ScrapeActivity
extend AWS::Flow::Activities

activity :scrape do
{
:version => "1.1",
:default_task_list => SWF_ACTIVITY_TASK_LIST,
:default_task_schedule_to_start_timeout => 10 * 60,
:default_task_start_to_close_timeout => 30,
}
end

def initialize
@count = 0
end

def scrape(asin)
begin
@count += 1

url = "http://www.amazon.com/dp/" + asin
response = HTTParty.get(URI.encode(url))
doc = Nokogiri::HTML(response)

price_div = doc.at_css('.priceLarge')
price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f

unless price
price_div = doc.at_css('.a-color-price.a-size-large')
price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
end

if price
product = Product.find_by_asin(asin)
product.records.create( price: price )
end

puts "#{@count} Scraped: #{asin}: #{price}"
rescue => e
puts "Error: #{e.message}"
end
end
end

The last piece of the puzzle is actually queuing the jobs and running the workflow and activity. This is accomplished with a rake task:

require "#{Rails.root}/app/helpers/application_helper"
include ApplicationHelper<

require "#{ENV['GEM_HOME']}/gems/aws-flow-1.0.0/lib/aws/decider.rb"
require "#{Rails.root}/config/initializers/swf.rb"
require "#{Rails.root}/lib/scrape_activity.rb"
require "#{Rails.root}/lib/scrape_workflow.rb"

namespace :swf do
desc 'Start activity worker'
task :activity => :environment do
swf, domain = swf_domain
activity_worker = AWS::Flow::ActivityWorker.new(swf.client, domain, SWF_ACTIVITY_TASK_LIST, ScrapeActivity) { {:use_forking => false} }
activity_worker.start
end

desc 'Start workflow worker'
task :workflow => :environment do
swf, domain = swf_domain
worker = AWS::Flow::WorkflowWorker.new(swf.client, domain, SWF_WORKFLOW_TASK_LIST, ScrapeWorkflow)
worker.start
end

desc 'Queue activities'
task :scrape => :environment do
swf, domain = swf_domain
my_workflow_client = AWS::Flow::workflow_client(swf.client, domain) { {:from_class => "ScrapeWorkflow"} }

Product.all.each do |product|
$workflow_execution = my_workflow_client.start_execution(product.asin)
end
end
end

Setting up the client and domain is done by a helper method:

module ApplicationHelper
def swf_domain
config_file = File.open("#{Rails.root}/config/aws.yml") { |f| f.read }
AWS.config(YAML.load(config_file))

@swf = AWS::SimpleWorkflow.new
begin
@domain = @swf.domains.create(SWF_DOMAIN, "10")
rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e
@domain = @swf.domains[SWF_DOMAIN]
end

return @swf, @domain
end
end