diff --git a/lib/perfectqueue/backend/null.rb b/lib/perfectqueue/backend/null.rb deleted file mode 100644 index 76339bd..0000000 --- a/lib/perfectqueue/backend/null.rb +++ /dev/null @@ -1,27 +0,0 @@ -module PerfectQueue::Backend - class NullBackend - def list(&block) - nil - end - - def acquire(timeout, now=Time.now.to_i) - nil - end - - def finish(token, delete_timeout=3600, now=Time.now.to_i) - true - end - - def update(token, timeout) - nil - end - - def cancel(id, delete_timeout=3600, now=Time.now.to_i) - true - end - - def submit(id, data, time=Time.now.to_i, resource=nil, max_running=nil) - true - end - end -end diff --git a/lib/perfectqueue/backend/rdb.rb b/lib/perfectqueue/backend/rdb.rb deleted file mode 100644 index 0a16ed8..0000000 --- a/lib/perfectqueue/backend/rdb.rb +++ /dev/null @@ -1,82 +0,0 @@ -require 'sequel' -require 'uri' -require_relative 'rdb_compat' - -module PerfectQueue::Backend - class RDBBackend - MAX_RETRY = ::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY - DELETE_OFFSET = ::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET - class Token < Struct.new(:key) - end - - def initialize(uri, table, config={}) - @uri = uri - @table = table - - u = URI.parse(@uri) - options = { - max_connections: 1, - user: u.user, - password: u.password, - host: u.host, - port: u.port ? u.port.to_i : 3306 - } - options[:sslca] = config[:sslca] if config[:sslca] - db_name = u.path.split('/')[1] - @db = Sequel.mysql2(db_name, options) - - @mutex = Mutex.new - connect { - # connection test - } - end - - attr_reader :db - - def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second), resource=nil, max_running=nil) - connect { - begin - data = Sequel::SQL::Blob.new(data) - @db.sql_log_level = :debug - n = @db["INSERT INTO `#{@table}` (id, timeout, data, created_at, resource, max_running) VALUES (?, ?, ?, ?, ?, ?);", id, time, data, time, resource, max_running].insert - return true - rescue Sequel::UniqueConstraintViolation => e - return nil - end - } - end - - def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second)) - connect { - n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update - return n > 0 - } - end - - private - def connect(&block) - @mutex.synchronize do - retry_count = 0 - begin - block.call - rescue - # workaround for "Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction" error - if $!.to_s.include?('try restarting transaction') - err = ([$!] + $!.backtrace.map {|bt| " #{bt}" }).join("\n") - retry_count += 1 - if retry_count < MAX_RETRY - STDERR.puts err + "\n retrying." - sleep 0.5 - retry - else - STDERR.puts err + "\n abort." - end - end - raise - ensure - @db.disconnect - end - end - end - end -end diff --git a/spec/null_backend_spec.rb b/spec/null_backend_spec.rb deleted file mode 100644 index c0c5b32..0000000 --- a/spec/null_backend_spec.rb +++ /dev/null @@ -1,30 +0,0 @@ -require 'spec_helper' -require 'perfectqueue/backend/null' - -describe Backend::NullBackend do - let (:backend){ Backend::NullBackend.new } - describe '#list' do - subject { backend.list{} } - it { is_expected.to be_nil } - end - describe '#acquire' do - subject { backend.acquire(double('timeout')) } - it { is_expected.to be_nil } - end - describe '#finish' do - subject { backend.finish(double('token')) } - it { is_expected.to be true } - end - describe '#update' do - subject { backend.update(double('token'), double('timeout')) } - it { is_expected.to be_nil } - end - describe '#cancel' do - subject { backend.cancel(double('id')) } - it { is_expected.to be true } - end - describe '#submit' do - subject { backend.submit(double('id'), double('data')) } - it { is_expected.to be true } - end -end diff --git a/spec/rdb_backend_spec.rb b/spec/rdb_backend_spec.rb deleted file mode 100644 index a2c3982..0000000 --- a/spec/rdb_backend_spec.rb +++ /dev/null @@ -1,73 +0,0 @@ -require 'spec_helper' -require 'perfectqueue/backend/rdb' - -describe Backend::RDBBackend do - let (:now){ Time.now.to_i } - let (:uri){ 'mysql2://root:@localhost/perfectqueue_test' } - let (:table){ 'test_queues' } - let (:db) do - d = Backend::RDBCompatBackend.new(double, url: uri, table: table) - s = d.db - s.tables.each{|t| s.drop_table(t) } - d.init_database({}) - Backend::RDBBackend.new(uri, table) - end - - context '.new' do - it 'supports mysql' do - expect(Backend::RDBBackend.new(uri, table)).to be_an_instance_of(Backend::RDBBackend) - end - end - - context '#submit' do - it 'adds task' do - expect(db.submit('key', '{"foo":"bar"}')).to be true - row = db.db.fetch("SELECT * FROM `#{table}` WHERE id=? LIMIT 1", 'key').first - expect(row[:created_at]).not_to be_nil - expect(row[:data]).to eq('{"foo":"bar"}') - end - it 'returns nil for a duplicated task' do - expect(db.submit('key', '{"foo":"bar"}')).to be true - expect(db.submit('key', '{"foo":"bar"}')).to be_nil - end - end - - context '#cancel' do - let (:key){ 'key' } - context 'have the task' do - before do - db.submit(key, '{}') - end - it 'returns true' do - expect(db.cancel(key)).to be true - row = db.db.fetch("SELECT created_at FROM `#{table}` WHERE id=? LIMIT 1", key).first - expect(row[:created_at]).to be_nil - end - end - context 'already canceled' do - it 'returns false' do - expect(db.cancel(key)).to be false - end - end - end - - context '#connect' do - context 'normal' do - it 'returns nil' do - expect(db.__send__(:connect){ }).to be_nil - end - end - context 'error' do - it 'returns block result' do - expect(RuntimeError).to receive(:new).exactly(Backend::RDBBackend::MAX_RETRY).and_call_original - allow(STDERR).to receive(:puts) - allow(db).to receive(:sleep) - expect do - db.__send__(:connect) do - raise RuntimeError.new('try restarting transaction') - end - end.to raise_error(RuntimeError) - end - end - end -end