#!/usr/bin/env ruby
require 'rubygems'
require 'mysql'
gem "postgres"
require 'postgres'
class MysqlReader
class Field
end
class Table
attr_reader :name
def initialize(reader, name)
@reader = reader
@name = name
end
@@types = %w(tiny enum decimal short long float double null timestamp longlong int24 date time datetime year set blob string var_string char).inject({}) do |list, type|
list[eval("Mysql::Field::TYPE_#{type.upcase}")] = type
list
end
@@types[246] = "decimal"
def columns
@columns ||= load_columns
end
def convert_type(type)
case type
when "tinyint(1)"
"boolean"
when /tinyint/
"tinyint"
when /int/
"integer"
when /varchar/
"varchar"
when /decimal/
"decimal"
else
type
end
end
def load_columns
@reader.reconnect
result = @reader.mysql.list_fields(name)
mysql_flags = Mysql::Field.constants.select {|c| c =~ /FLAG/}
fields = []
@reader.mysql.query("EXPLAIN `#{name}`") do |res|
while field = res.fetch_row do
length = field[1][/\((\d+)\)/, 1] if field[1] =~ /\((\d+)\)/
length = field[1][/\((\d+),(\d+)\)/, 1] if field[1] =~ /\((\d+),(\d+)\)/
desc = {
:name => field[0],
:table_name => name,
:type => convert_type(field[1]),
:length => length && length.to_i,
:decimals => field[1][/\((\d+),(\d+)\)/, 2],
:null => field[2] == "YES",
:primary_key => field[3] == "PRI"
}
desc[:default] = field[4] unless field[4].nil? || field[4].empty?
fields << desc
end
end
fields.select {|field| field[:primary_key]}.each do |field|
@reader.mysql.query("SELECT max(`#{field[:name]}`) + 1 FROM `#{name}`") do |res|
field[:maxval] = res.fetch_row[0].to_i
end
end
fields
end
def indexes
load_indexes unless @indexes
@indexes
end
def foreign_keys
load_indexes unless @foreign_keys
@foreign_keys
end
def load_indexes
@indexes = []
@foreign_keys = []
@reader.mysql.query("SHOW CREATE TABLE `#{name}`") do |result|
explain = result.fetch_row[1]
explain.split(/\n/).each do |line|
next unless line =~ / KEY /
index = {}
if match_data = /CONSTRAINT `(\w+)` FOREIGN KEY \(`(\w+)`\) REFERENCES `(\w+)` \(`(\w+)`\)/.match(line)
index[:name] = match_data[1]
index[:column] = match_data[2]
index[:ref_table] = match_data[3]
index[:ref_column] = match_data[4]
@foreign_keys << index
elsif match_data = /KEY `(\w+)` \((.*)\)/.match(line)
index[:name] = match_data[1]
index[:columns] = match_data[2].split(",").map {|col| col[/`(\w+)`/, 1]}
index[:unique] = true if line =~ /UNIQUE/
@indexes << index
elsif match_data = /PRIMARY KEY .*\((.*)\)/.match(line)
index[:primary] = true
index[:columns] = match_data[1].split(",").map {|col| col.strip.gsub(/`/, "")}
@indexes << index
end
end
end
end
def has_id?
!!columns.find {|col| col[:name] == "id"}
end
def count_for_pager
query = has_id? ? 'MAX(id)' : 'COUNT(*)'
@reader.mysql.query("SELECT #{query} FROM #{name}") do |res|
return res.fetch_row[0].to_i
end
end
def query_for_pager
query = has_id? ? 'WHERE id >= ? AND id < ?' : 'LIMIT ?,?'
"SELECT #{columns.map{|c| "`"+c[:name]+"`"}.join(", ")} FROM `#{name}` #{query}"
end
end
def connect
@mysql = Mysql.connect(@host, @user, @passwd, @db, @sock, @flag)
@mysql.query("SET NAMES utf8")
@mysql.query("SET SESSION query_cache_type = OFF")
end
def reconnect
@mysql.close rescue false
connect
end
def initialize(host = nil, user = nil, passwd = nil, db = nil, sock = nil, flag = nil)
@host, @user, @passwd, @db, @sock, @flag = host, user, passwd, db, sock, flag
connect
end
attr_reader :mysql
def tables
@tables ||= @mysql.list_tables.map {|table| Table.new(self, table)}
end
def paginated_read(table, page_size)
count = table.count_for_pager
return if count < 1
statement = @mysql.prepare(table.query_for_pager)
counter = 0
0.upto((count + page_size)/page_size) do |i|
statement.execute(i*page_size, table.has_id? ? (i+1)*page_size : page_size)
while row = statement.fetch
counter += 1
yield(row, counter)
end
end
counter
end
end
class Writer
end
class PostgresWriter < Writer
def column_description(column)
"#{PGconn.quote_ident(column[:name])} #{column_type_info(column)}"
end
def column_type(column)
column_type_info(column).split(" ").first
end
def column_type_info(column)
if column[:primary_key] && column[:name] == "id"
return "integer DEFAULT nextval('#{column[:table_name]}_#{column[:name]}_seq'::regclass) NOT NULL"
end
default = column[:default] ? " DEFAULT #{column[:default] == nil ? 'NULL' : "'"+PGconn.escape(column[:default])+"'"}" : nil
null = column[:null] ? "" : " NOT NULL"
type =
case column[:type]
when "varchar"
default = default + "::character varying" if default
# puts "VARCHAR: #{column.inspect}"
"character varying(#{column[:length]})"
when "integer"
default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default].to_i}" if default
"integer"
when "tinyint"
default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default].to_i}" if default
"smallint"
when "datetime"
default = nil
"timestamp without time zone"
when "date"
default = nil
"date"
when "boolean"
default = " DEFAULT #{column[:default].to_i == 1 ? 'true' : 'false'}" if default
"boolean"
when "blob"
"bytea"
when "mediumtext"
"text"
when "longtext"
"text"
when "text"
"text"
when "double"
default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default
"double precision"
when /^enum/
default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default
enum = column[:type].gsub(/enum|\(|\)/, '')
max_enum_size = enum.split(',').map{ |check| check.size() -2}.sort[-1]
"character varying(#{max_enum_size}), check( #{column[:name]} in (#{enum}))"
when "float"
default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default].to_f}" if default
"real"
when "decimal"
default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default
"numeric(#{column[:length] || 10}, #{column[:decimals] || 0})"
when "timestamp"
default = " DEFAULT CURRENT_TIMESTAMP" if column[:default] == "CURRENT_TIMESTAMP"
"timestamp without time zone"
when "time"
default = " DEFAULT now" if default
"time without time zone"
else
puts "Unknown #{column.inspect}"
column[:type].inspect
return ""
end
"#{type}#{default}#{null}"
end
end
class PostgresFileWriter < PostgresWriter
def initialize(file)
@f = File.open(file, "w+")
@f << <<-EOF
-- MySQL 2 PostgreSQL dump\n
SET client_encoding = 'UTF8';
SET standard_conforming_strings = off;
SET check_function_bodies = false;
SET client_min_messages = warning;
EOF
end
def write_table(table)
primary_keys = []
primary_key = nil
maxval = nil
columns = table.columns.map do |column|
if column[:primary_key]
if column[:name] == "id"
primary_key = column[:name]
maxval = column[:maxval] < 1 ? 1 : column[:maxval] + 1
end
primary_keys << column[:name]
end
" " + column_description(column)
end.join(",\n")
if primary_key
@f << <<-EOF
--
-- Name: #{table.name}_#{primary_key}_seq; Type: SEQUENCE; Schema: public
--
DROP SEQUENCE IF EXISTS #{table.name}_#{primary_key}_seq CASCADE;
CREATE SEQUENCE #{table.name}_#{primary_key}_seq
INCREMENT BY 1
NO MAXVALUE
NO MINVALUE
CACHE 1;
SELECT pg_catalog.setval('#{table.name}_#{primary_key}_seq', #{maxval}, true);
EOF
end
@f << <<-EOF
-- Table: #{table.name}
-- DROP TABLE #{table.name};
DROP TABLE IF EXISTS #{PGconn.quote_ident(table.name)} CASCADE;
CREATE TABLE #{PGconn.quote_ident(table.name)} (
EOF
@f << columns
if primary_index = table.indexes.find {|index| index[:primary]}
@f << ",\n CONSTRAINT #{table.name}_pkey PRIMARY KEY(#{primary_index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")})"
end
@f << <<-EOF
\n)
WITHOUT OIDS;
EOF
table.indexes.each do |index|
next if index[:primary]
unique = index[:unique] ? "UNIQUE " : nil
@f << <<-EOF
DROP INDEX IF EXISTS #{PGconn.quote_ident(index[:name])} CASCADE;
CREATE #{unique}INDEX #{PGconn.quote_ident(index[:name])} ON #{PGconn.quote_ident(table.name)} (#{index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")});
EOF
end
end
def write_indexes(table)
end
def write_constraints(table)
table.foreign_keys.each do |key|
@f << "ALTER TABLE #{PGconn.quote_ident(table.name)} ADD FOREIGN KEY (#{PGconn.quote_ident(key[:column])}) REFERENCES #{PGconn.quote_ident(key[:ref_table])}(#{PGconn.quote_ident(key[:ref_column])});\n"
end
end
def write_contents(table, reader)
@f << <<-EOF
--
-- Data for Name: #{table.name}; Type: TABLE DATA; Schema: public
--
COPY "#{table.name}" (#{table.columns.map {|column| PGconn.quote_ident(column[:name])}.join(", ")}) FROM stdin;
EOF
reader.paginated_read(table, 1000) do |row, counter|
line = []
table.columns.each_with_index do |column, index|
row[index] = row[index].to_s if row[index].is_a?(Mysql::Time)
if column[:type] == "char"
row[index] = row[index] == 1 ? 't' : row[index] == 0 ? 'f' : row[index]
end
if row[index].is_a?(String)
if column[:type] == "bytea"
row[index] = PGconn.quote(row[index])
else
row[index] = row[index].gsub(/\\/, '\\\\\\').gsub(/\n/,'\n').gsub(/\t/,'\t').gsub(/\r/,'\r')
end
end
row[index] = '\N' if !row[index]
end
@f << row.join("\t") + "\n"
end
@f << "\\.\n\n"
@f << "VACUUM FULL ANALYZE #{PGconn.quote_ident(table.name)};\n\n"
end
def close
@f.close
end
end
class PostgresDbWriter < PostgresWriter
def connection(hostname, login, password, database, port)
database, schema = database.split(":")
@conn = PGconn.open('host' => hostname, 'user' => login, 'password' => password, 'dbname' => database, 'port' => port.to_s)
@conn.exec("SET search_path TO #{PGconn.quote_ident(schema)}") if schema
end
def initialize(hostname, login, password, database, port = 5432)
connection(hostname, login, password, database, port)
@conn.exec("SET client_encoding = 'UTF8'")
@conn.exec("SET standard_conforming_strings = off") if @conn.server_version >= 80200
@conn.exec("SET check_function_bodies = false")
@conn.exec("SET client_min_messages = warning")
end
def exists?(relname)
rc = @conn.select_one("SELECT COUNT(*) FROM pg_class WHERE relname = #{PGconn.quote(relname)}")
(!rc.nil?) && (!rc.empty?) && (rc.first.to_i > 0)
end
def write_table(table)
primary_keys = []
primary_key = nil
maxval = nil
columns = table.columns.map do |column|
if column[:primary_key]
if column[:name] == "id"
primary_key = column[:name]
maxval = column[:maxval] < 1 ? 1 : column[:maxval] + 1
end
primary_keys << column[:name]
end
" " + column_description(column)
end.join(",\n")
if primary_key
if @conn.server_version < 80200
primary_key_seq = "#{table.name}_#{primary_key}_seq"
@conn.exec("DROP SEQUENCE #{primary_key_seq} CASCADE") if exists?(primary_key_seq)
else
@conn.exec("DROP SEQUENCE IF EXISTS #{table.name}_#{primary_key}_seq CASCADE")
end
@conn.exec <<-EOF
CREATE SEQUENCE #{table.name}_#{primary_key}_seq
INCREMENT BY 1
NO MAXVALUE
NO MINVALUE
CACHE 1
EOF
@conn.exec "SELECT pg_catalog.setval('#{table.name}_#{primary_key}_seq', #{maxval}, true)"
end
if @conn.server_version < 80200
@conn.exec "DROP TABLE #{PGconn.quote_ident(table.name)} CASCADE;" if exists?(table.name)
else
@conn.exec "DROP TABLE IF EXISTS #{PGconn.quote_ident(table.name)} CASCADE;"
end
create_sql = "CREATE TABLE #{PGconn.quote_ident(table.name)} (\n" + columns + "\n)\nWITHOUT OIDS;"
begin
@conn.exec(create_sql)
rescue Exception => e
puts "Error: \n#{create_sql}"
raise
end
puts "Created table #{table.name}"
end
def write_indexes(table)
if primary_index = table.indexes.find {|index| index[:primary]}
@conn.exec("ALTER TABLE #{PGconn.quote_ident(table.name)} ADD CONSTRAINT #{table.name}_pkey PRIMARY KEY(#{primary_index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")})")
end
table.indexes.each do |index|
next if index[:primary]
unique = index[:unique] ? "UNIQUE " : nil
if @conn.server_version < 80200
@conn.exec("DROP INDEX #{PGconn.quote_ident(index[:name])} CASCADE;") if exists?(index[:name])
else
@conn.exec("DROP INDEX IF EXISTS #{PGconn.quote_ident(index[:name])} CASCADE;")
end
@conn.exec("CREATE #{unique}INDEX #{PGconn.quote_ident(index[:name])} ON #{PGconn.quote_ident(table.name)} (#{index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")});")
end
@conn.exec("VACUUM FULL ANALYZE #{PGconn.quote_ident(table.name)}")
puts "Indexed table #{table.name}"
rescue Exception => e
puts "Couldn't create indexes on #{table} (#{table.indexes.inspect})"
puts e
puts e.backtrace[0,3].join("\n")
end
def write_constraints(table)
table.foreign_keys.each do |key|
key_sql = "ALTER TABLE #{PGconn.quote_ident(table.name)} ADD FOREIGN KEY (#{PGconn.quote_ident(key[:column])}) REFERENCES #{PGconn.quote_ident(key[:ref_table])}(#{PGconn.quote_ident(key[:ref_column])})"
begin
@conn.exec(key_sql)
rescue Exception => e
puts "Error: \n#{key_sql}\n#{e}"
end
end
end
def write_contents(table, reader)
_time1 = Time.now
copy_line = "COPY #{PGconn.quote_ident(table.name)} (#{table.columns.map {|column| PGconn.quote_ident(column[:name])}.join(", ")}) FROM stdin;"
@conn.exec(copy_line)
print "Loading #{table.name}: "
STDOUT.flush
_counter = reader.paginated_read(table, 1000) do |row, counter|
line = []
table.columns.each_with_index do |column, index|
if !row[index]
row[index] = '\N'
next
end
if column[:type] == "time"
row[index] = "%02d:%02d:%02d" % [row[index].hour, row[index].minute, row[index].second]
next
end
if row[index].is_a?(Mysql::Time)
row[index] = row[index].to_s.gsub('0000-00-00 00:00', '1970-01-01 00:00')
next
end
if column_type(column) == "boolean"
row[index] = row[index] == 1 ? 't' : row[index] == 0 ? 'f' : row[index]
next
end
if row[index].is_a?(String)
if column_type(column) == "bytea"
row[index] = PGconn.quote(row[index])
else
row[index] = row[index].gsub(/\\/, '\\\\\\').gsub(/\n/,'\n').gsub(/\t/,'\t').gsub(/\r/,'\r').gsub(/\0/, '')
end
end
end
@conn.putline(row.join("\t") + "\n")
if counter % 5000 == 0
print "*"
STDOUT.flush
@conn.endcopy
@conn.exec(copy_line)
elsif counter % 1000 == 0
print "."
STDOUT.flush
end
end
_time2 = Time.now
puts " #{_counter} (#{((_time2 - _time1) / 60).round}min #{((_time2 - _time1) % 60).round}s)"
# @conn.putline(".\n")
@conn.endcopy
end
def close
@conn.close
end
end
class Converter
attr_reader :reader, :writer
def initialize(reader, writer, options = {})
@reader = reader
@writer = writer
@exclude_tables = options[:exclude_tables] || []
@only_tables = options[:only_tables] ? Array(options[:only_tables]) : nil
@supress_data = options[:supress_data]
end
def convert
_time1 = Time.now
tables = reader.tables.
reject {|table| @exclude_tables.include?(table.name)}.
select {|table| @only_tables ? @only_tables.include?(table.name) : true}
tables.each do |table|
writer.write_table(table)
end
_time2 = Time.now
tables.each do |table|
writer.write_contents(table, reader)
end unless @supress_data
_time3 = Time.now
tables.each do |table|
writer.write_indexes(table)
end
tables.each do |table|
writer.write_constraints(table)
end
writer.close
_time4 = Time.now
puts "Table creation #{((_time2 - _time1) / 60).round} min, loading #{((_time3 - _time2) / 60).round} min, indexing #{((_time4 - _time3) / 60).round} min, total #{((_time4 - _time1) / 60).round} min"
end
end
reader = MysqlReader.new('localhost', 'mdvbz', 'coin123', 'mdvbz')
#writer = PostgresFileWriter.new($ARGV[2] || "output.sql")
writer = PostgresFileWriter.new("output.sql")
#writer = PostgresDbWriter.new('localhost', 'prophotos', '123', 'prophotos_development:old')
#converter = Converter.new(reader, writer, :only_tables => %w(bugs profiles rpmpkg))
converter = Converter.new(reader, writer, :only_tables => %w(mdvbz_bugs mdvbz_profiles mdvbz_rpmpkg))
#converter = Converter.new(reader, writer, :only_tables => %w(rpmpkg))
converter.convert
|