Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianna-chang-shopify authored and george-ma committed Nov 6, 2024
1 parent d12a42d commit d03c7bc
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,58 @@ def insert_fixture(fixture, table_name)
end

def insert_fixtures_set(fixture_set, tables_to_delete = [])
fixture_inserts = build_fixture_statements(fixture_set)
table_deletes = tables_to_delete.map { |table| "DELETE FROM #{quote_table_name(table)}" }
statements = table_deletes + fixture_inserts
# If async is enabled, and we're not in a transaction, insert fixtures async. Otherwise, fallback to multi-statement inserts.
if async_enabled? && current_transaction.closed?
queries = []
fixture_set.each do |table_name, fixtures|
next if fixtures.empty?

insert_sql = build_fixture_sql(fixtures, table_name)
statement = tables_to_delete.include?(table_name) ? "DELETE FROM #{quote_table_name(table_name)};\n" : "" # Combine delete and insert into single statement since it's async
statement += transform_query(insert_sql)

query_label = "Fixture Load for #{table_name}"
# puts "Scheduling #{query_label} for async execution"
result = ActiveRecord::FutureResult::InsertFixtures.new(pool, statement, query_label)
result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
queries << result
end
# puts "waiting for queries to complete"
begin
queries.each(&:result)
rescue => e
puts "Error: #{e}"
# truncate instead of rolling back; can't rollback async queries
truncate_tables(*tables_to_delete)
end
else
fixture_inserts = build_fixture_statements(fixture_set)
table_deletes = tables_to_delete.map { |table| "DELETE FROM #{quote_table_name(table)}" }
statements = table_deletes + fixture_inserts

with_multi_statements do
transaction(requires_new: true) do
disable_referential_integrity do
execute_batch(statements, "Fixtures Load")
end
end
end
end
end

transaction(requires_new: true) do
def exec_fixtures_insert(*args, **kwargs)
with_multi_statements do
disable_referential_integrity do
execute_batch(statements, "Fixtures Load")
with_raw_connection do |conn|
# Simulate something raising to verify table truncating happens correctly
# if args[0].match?(/author_addresses/)
# puts "sending malformed SQL"
# args[0] = "BAD QUERY;"
# end
internal_exec_query(*args, **kwargs)
# This only works for trilogy rn; will need to make work for other adapters using multi-statement client API
conn.next_result while conn.more_results_exist?
end
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions activerecord/lib/active_record/future_result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,12 @@ def exec_query(*, **)
ActiveRecord::Result.empty
end
end

class InsertFixtures < FutureResult # :nodoc:
private
def exec_query(connection, *args, **kwargs)
connection.exec_fixtures_insert(*args, **kwargs)
end
end
end
end
168 changes: 112 additions & 56 deletions activerecord/test/cases/fixtures_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ def test_bulk_insert_multiple_table_with_a_multi_statement_query
subscriber = InsertQuerySubscriber.new
subscription = ActiveSupport::Notifications.subscribe("sql.active_record", subscriber)

create_fixtures("bulbs", "movies", "computers")
conn = ActiveRecord::Base.lease_connection

conn.stub(:async_enabled?, false) do
create_fixtures("bulbs", "movies", "computers")
end

expected_sql = <<~EOS.chop
INSERT INTO #{quote_table_name("bulbs")} .*
INSERT INTO #{quote_table_name("movies")} .*
INSERT INTO #{quote_table_name("computers")} .*
INSERT INTO #{conn.quote_table_name("bulbs")} .*
INSERT INTO #{conn.quote_table_name("movies")} .*
INSERT INTO #{conn.quote_table_name("computers")} .*
EOS
assert_equal 1, subscriber.events.size
assert_match(/#{expected_sql}/, subscriber.events.first)
Expand All @@ -124,7 +128,10 @@ def test_bulk_insert_with_a_multi_statement_query_raises_an_exception_when_any_i

assert_no_difference "Aircraft.count" do
assert_raises(ActiveRecord::NotNullViolation) do
ActiveRecord::Base.lease_connection.insert_fixtures_set(fixtures)
conn = ActiveRecord::Base.lease_connection
conn.stub(:async_enabled?, false) do
conn.insert_fixtures_set(fixtures)
end
end
end
end
Expand Down Expand Up @@ -201,7 +208,7 @@ def test_bulk_insert_with_multi_statements_enabled
end
end

def test_bulk_insert_with_multi_statements_disabled
def test_bulk_insert_with_async_and_multi_statements_disabled
adapter_name = ActiveRecord::Base.lease_connection.adapter_name
run_without_connection do |orig_connection|
case adapter_name
Expand Down Expand Up @@ -234,7 +241,9 @@ def test_bulk_insert_with_multi_statements_disabled

assert_difference "TrafficLight.count" do
conn = ActiveRecord::Base.lease_connection
conn.insert_fixtures_set(fixtures)
conn.stub(:async_enabled?, false) do
conn.insert_fixtures_set(fixtures)
end
end

assert_raises(ActiveRecord::StatementInvalid) do
Expand All @@ -250,59 +259,65 @@ def test_bulk_insert_with_multi_statements_disabled
end
end

def test_insert_fixtures_set_raises_an_error_when_max_allowed_packet_is_smaller_than_fixtures_set_size
def test_insert_fixtures_set_using_multi_statements_raises_an_error_when_max_allowed_packet_is_smaller_than_fixtures_set_size
conn = ActiveRecord::Base.lease_connection
mysql_margin = 2
packet_size = 1024
bytes_needed_to_have_a_1024_bytes_fixture = 906
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * bytes_needed_to_have_a_1024_bytes_fixture] },
]
}
conn.stub(:async_enabled?, false) do # Test the multi-statements path
mysql_margin = 2
packet_size = 1024
bytes_needed_to_have_a_1024_bytes_fixture = 906
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * bytes_needed_to_have_a_1024_bytes_fixture] },
]
}

conn.stub(:max_allowed_packet, packet_size - mysql_margin) do
error = assert_raises(ActiveRecord::ActiveRecordError) { conn.insert_fixtures_set(fixtures) }
assert_match(/Fixtures set is too large #{packet_size}\./, error.message)
conn.stub(:max_allowed_packet, packet_size - mysql_margin) do
error = assert_raises(ActiveRecord::ActiveRecordError) { conn.insert_fixtures_set(fixtures) }
assert_match(/Fixtures set is too large #{packet_size}\./, error.message)
end
end
end

def test_insert_fixture_set_when_max_allowed_packet_is_bigger_than_fixtures_set_size
def test_insert_fixture_set_using_multi_statements_when_max_allowed_packet_is_bigger_than_fixtures_set_size
conn = ActiveRecord::Base.lease_connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 51] },
]
}
conn.stub(:async_enabled?, false) do
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 51] },
]
}

conn.stub(:max_allowed_packet, packet_size) do
assert_difference "TrafficLight.count" do
conn.insert_fixtures_set(fixtures)
conn.stub(:max_allowed_packet, packet_size) do
assert_difference "TrafficLight.count" do
conn.insert_fixtures_set(fixtures)
end
end
end
end

def test_insert_fixtures_set_split_the_total_sql_into_two_chunks_smaller_than_max_allowed_packet
def test_insert_fixtures_set_using_multi_statements_split_the_total_sql_into_two_chunks_smaller_than_max_allowed_packet
subscriber = InsertQuerySubscriber.new
subscription = ActiveSupport::Notifications.subscribe("sql.active_record", subscriber)
conn = ActiveRecord::Base.lease_connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 450] },
],
"comments" => [
{ "post_id" => 1, "body" => "a" * 450 },
]
}
conn.stub(:async_enabled?, false) do
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 450] },
],
"comments" => [
{ "post_id" => 1, "body" => "a" * 450 },
]
}

conn.stub(:max_allowed_packet, packet_size) do
conn.insert_fixtures_set(fixtures)
conn.stub(:max_allowed_packet, packet_size) do
conn.insert_fixtures_set(fixtures)

assert_equal 2, subscriber.events.size
assert_operator subscriber.events.first.bytesize, :<, packet_size
assert_operator subscriber.events.second.bytesize, :<, packet_size
assert_equal 2, subscriber.events.size
assert_operator subscriber.events.first.bytesize, :<, packet_size
assert_operator subscriber.events.second.bytesize, :<, packet_size
end
end
ensure
ActiveSupport::Notifications.unsubscribe(subscription)
Expand All @@ -312,19 +327,21 @@ def test_insert_fixtures_set_concat_total_sql_into_a_single_packet_smaller_than_
subscriber = InsertQuerySubscriber.new
subscription = ActiveSupport::Notifications.subscribe("sql.active_record", subscriber)
conn = ActiveRecord::Base.lease_connection
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 200] },
],
"comments" => [
{ "post_id" => 1, "body" => "a" * 200 },
]
}
conn.stub(:async_enabled?, false) do
packet_size = 1024
fixtures = {
"traffic_lights" => [
{ "location" => "US", "state" => ["NY"], "long_state" => ["a" * 200] },
],
"comments" => [
{ "post_id" => 1, "body" => "a" * 200 },
]
}

conn.stub(:max_allowed_packet, packet_size) do
assert_difference ["TrafficLight.count", "Comment.count"], +1 do
conn.insert_fixtures_set(fixtures)
conn.stub(:max_allowed_packet, packet_size) do
assert_difference ["TrafficLight.count", "Comment.count"], +1 do
conn.insert_fixtures_set(fixtures)
end
end
end
assert_equal 1, subscriber.events.size
Expand All @@ -333,6 +350,45 @@ def test_insert_fixtures_set_concat_total_sql_into_a_single_packet_smaller_than_
end
end

if current_adapter?(:Mysql2Adapter, :TrilogyAdapter)
def test_insert_fixtures_async
async_queries = []
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql].match?(/INSERT/)
query = {
sql: event.payload[:sql],
executed: true,
async: event.payload[:async],
}
async_queries << query
end
end

create_fixtures("bulbs", "movies", "computers")

expected_sql_queries = [
/DELETE FROM `bulbs`;\nINSERT INTO `bulbs`/,
/DELETE FROM `movies`;\nINSERT INTO `movies`/,
/DELETE FROM `computers`;\nINSERT INTO `computers`/,
]

assert_equal expected_sql_queries.size, async_queries.size
expected_sql_queries.each_with_index do |query, i|
assert_match(query, async_queries[i][:sql])
assert async_queries[i][:executed]
assert async_queries[i][:async]
end
ensure
ActiveSupport::Notifications.unsubscribe(subscriber)
end

def test_insert_fixtures_async_uses_multi_statement_when_in_transaction
end

def test_insert_fixtures_async_truncates_tables_on_exception_and_raises
end
end

def test_auto_value_on_primary_key
fixtures = [
{ "name" => "first", "wheels_count" => 2 },
Expand Down

0 comments on commit d03c7bc

Please sign in to comment.