Procesando archivos en background con Rails


Procesar archivos en paneles de administración es un bajón, sobre todo cuando son cada vez más grande y cada vez más. En uno de nuestros proyectos tenemos ya unos cinco archivos CSV diferentes a procesar donde cada uno cumple alguna función específica. Algunos de estos archivos superan los 5 Mb por upload, por lo que se hizo necesario desacoplar el procesamiento del archivo del request que realiza el upload.

Para manejar procesos en background en rails hay muchas opciones buenas, algunas más complejas con colas de trabajo, workers distribuidos y otras cosas que nosotros no necesitamos. Lo importante para nosotros era encontrar alguno que permita :

  • Procesar en background (obvio :D)
  • Poder saber en que estado está el trabajo (% completado)
  • Al menos un proceso en background a la vez (si hay más que espere su turno, los uploads son semanales o mensuales por lo que no hay tanto solapamiento entre tareas como para justificar múltiples workers)

Y nada más. Buscando y buscando caí con BackgroundFu que cumplía con lo necesario.

Procesando los archivos

Como todos los archivos son de texto con algún separador (algunos ‘;’, otros tabs o comas) empezamos por lo básico y crear una clase base que maneje algunos parámetros comunes.

En BackgroundFu uno puede encolar cualquier clase, por lo que nuestro worker es simplemente una clase con un método process (que también es arbitrario el nombre, podría ser cualquier otro).

Como el archivo que sube el usuario está en un directorio temporal, antes de mandar al proceso en background en necesario guardarlo en algún lugar seguro, para poder saber que al momento de procesarlo va a seguir estando, para ello encapsulamos este comportamiento en un método especial para encolar.

Nuestra completa termina siendo algo como (más adelante veremos en detalle algunas cosas) :

class FileWorker
  include BackgroundFu::WorkerMonitoring
 
  attr_reader :total_lines
 
  def self.enqueue(file, *args)
    filename = "#{RAILS_ROOT}/tmp/#{self.name}-#{Time.now.to_i}"
    File.open(filename, "wb") { |f| f.write(file.read) }
    Job.enqueue!(self.name, :'process', filename, *args).id
  end
 
  def process(filename, *args)
    @lines = 0
    @total_lines = `wc -l #{filename}`.to_i
    record_progress(0)
 
    result = real_process(filename, *args)
 
    record_progress(100)
    File.unlink(filename)
 
    result
  end
 
  protected
  def line_processed!
    @lines += 1
 
    update(@lines)
  end
 
  def update(lines)
    record_progress(lines, @total_lines)
  end
end

Veamos un poco más en detalle algunas partes.

Encolado

  def self.enqueue(file, *args)
    filename = "#{RAILS_ROOT}/tmp/#{self.name}-#{Time.now.to_i}"
    File.open(filename, "wb") { |f| f.write(file.read) }
    Job.enqueue!(self.name, :'process', filename, *args).id
  end

Este método es quien guarda en un lugar seguro el archivo. Utilizando el nombre de la clase actual (recuerden que este método está en una clase base, de donde luego vamos a heredar) y un timestamp por las dudas que se lleguen a hacer 2 uploads (cosa que en la práctica no pasa, pero por las dudas está).

Luego de guardar el archivo se encola el trabajo pasándole el archivo (el path completo) y cualquier otro argumento que se quiera.

Esto hará que luego el daemon de background instancia la clase ‘self.name’ y llame al método process pasandole nuestros parámetros.

Procesamiento

El método genérico de procesamiento hace uso de un método que la clase base no tiene ‘real_process’ que es el método que las clases hijas definarán para realizar el trabajo final :

  def process(filename, *args)
    @lines = 0
    @total_lines = `wc -l #{filename}`.to_i
    record_progress(0)
 
    result = real_process(filename, *args)
 
    record_progress(100)
    File.unlink(filename)
 
    result
  end

En este caso el método se encarga de saber cual es el total de lineas (de una manera tal vez no muy linda, pero que anda :P) y borrar el archivo al terminar el proceso.

Un worker real

Para que esto tenga sentido, debemos crear un worker que haga realmente algo, entonces por ejemplo podríamos tener un worker que sume puntos a los usuarios desde un archivo :

class UserPointProcessor < FileWorker
  private
  def real_process(file)
    ActiveRecord::Base.transaction do
      FasterCSV.new(File.read(file), :col_sep => "\t").each do |row|
         # Hacer algo con row
 
         line_processed!
      end
    end
 
    "Podemos retornar algun valor o status de exito"
  end
end

De esta forma podemos ahora en algún controller hacer el encolado del trabajo :

  def add_points
    if request.post?
      redirect_to admin_job_path(UserPointProcessor.enqueue(params[:file])) and return
    end
 
    render 'file_upload'
  end

En este caso el template file_upload es un form con un file field para hacer el upload, sin ningún otro campo. La ruta admin_job_path nos la provee directamente BackgroundFu, generándonos también las vistas con AJAX para actualizar el estado, una linda barra de progreso que nos muestra el tiempo estimado para terminar entre otros datos útiles (o no tanto :D).

Leave a Reply