Subversion

helios_wp3

[/] [trunk/] [d2r/] [bugzilla/] [mysql2psql] - Rev 227 Go to most recent revision

Compare with Previous - Blame


#!/usr/bin/env ruby

require 'rubygems'
require 'mysql'
gem "postgres"
require 'postgres'

class MysqlReader
  class Field
  end
  
  class Table
    attr_reader :name
    
    def initialize(reader, name)
      @reader = reader
      @name = name
    end
    
    @@types = %w(tiny enum decimal short long float double null timestamp longlong int24 date time datetime year set blob string var_string char).inject({}) do |list, type|
      list[eval("Mysql::Field::TYPE_#{type.upcase}")] = type
      list
    end
    
    @@types[246] = "decimal"
    
    def columns
      @columns ||= load_columns
    end
    
    def convert_type(type)
      case type
      when "tinyint(1)"
        "boolean"
      when /tinyint/
        "tinyint"
      when /int/
        "integer"
      when /varchar/
        "varchar"
      when /decimal/
        "decimal"
      else
        type
      end 
    end
    
    def load_columns
      @reader.reconnect
      result = @reader.mysql.list_fields(name)
      mysql_flags = Mysql::Field.constants.select {|c| c =~ /FLAG/}
      fields = []
      @reader.mysql.query("EXPLAIN `#{name}`") do |res|
        while field = res.fetch_row do
          length = field[1][/\((\d+)\)/, 1] if field[1] =~ /\((\d+)\)/
          length = field[1][/\((\d+),(\d+)\)/, 1] if field[1] =~ /\((\d+),(\d+)\)/
          desc = {
            :name => field[0],
            :table_name => name,
            :type => convert_type(field[1]),
            :length => length && length.to_i,
            :decimals => field[1][/\((\d+),(\d+)\)/, 2],
            :null => field[2] == "YES",
            :primary_key => field[3] == "PRI"
            }
          desc[:default] = field[4] unless field[4].nil? || field[4].empty?
          fields << desc
        end
      end
 
      fields.select {|field| field[:primary_key]}.each do |field|
        @reader.mysql.query("SELECT max(`#{field[:name]}`) + 1 FROM `#{name}`") do |res|
          field[:maxval] = res.fetch_row[0].to_i
        end
      end
      fields
    end
    
    
    def indexes
      load_indexes unless @indexes
      @indexes 
    end
 
    def foreign_keys
      load_indexes unless @foreign_keys
      @foreign_keys
    end
    
    def load_indexes
      @indexes = []
      @foreign_keys = []
      
      @reader.mysql.query("SHOW CREATE TABLE `#{name}`") do |result|
        explain = result.fetch_row[1]
        explain.split(/\n/).each do |line|
          next unless line =~ / KEY /
          index = {}
          if match_data = /CONSTRAINT `(\w+)` FOREIGN KEY \(`(\w+)`\) REFERENCES `(\w+)` \(`(\w+)`\)/.match(line)
            index[:name] = match_data[1]
            index[:column] = match_data[2]
            index[:ref_table] = match_data[3]
            index[:ref_column] = match_data[4]
            @foreign_keys << index
          elsif match_data = /KEY `(\w+)` \((.*)\)/.match(line)
            index[:name] = match_data[1]
            index[:columns] = match_data[2].split(",").map {|col| col[/`(\w+)`/, 1]}
            index[:unique] = true if line =~ /UNIQUE/
            @indexes << index
          elsif match_data = /PRIMARY KEY .*\((.*)\)/.match(line)
            index[:primary] = true
            index[:columns] = match_data[1].split(",").map {|col| col.strip.gsub(/`/, "")}
            @indexes << index
          end
        end
      end
    end
    
    def has_id?
      !!columns.find {|col| col[:name] == "id"} 
    end
    
    def count_for_pager
      query = has_id? ? 'MAX(id)' : 'COUNT(*)'
      @reader.mysql.query("SELECT #{query} FROM #{name}") do |res|
        return res.fetch_row[0].to_i
      end
    end
 
    def query_for_pager
      query = has_id? ? 'WHERE id >= ? AND id < ?' : 'LIMIT ?,?'
      "SELECT #{columns.map{|c| "`"+c[:name]+"`"}.join(", ")} FROM `#{name}` #{query}"
    end
  end
  
  def connect
    @mysql = Mysql.connect(@host, @user, @passwd, @db, @sock, @flag)
    @mysql.query("SET NAMES utf8")
    @mysql.query("SET SESSION query_cache_type = OFF")
  end
  
  def reconnect
    @mysql.close rescue false
    connect
  end
  
  def initialize(host = nil, user = nil, passwd = nil, db = nil, sock = nil, flag = nil)
    @host, @user, @passwd, @db, @sock, @flag = host, user, passwd, db, sock, flag
    connect
  end
  
  attr_reader :mysql
  
  def tables
    @tables ||= @mysql.list_tables.map {|table| Table.new(self, table)}
  end
  
  def paginated_read(table, page_size)
    count = table.count_for_pager
    return if count < 1
    statement = @mysql.prepare(table.query_for_pager)
    counter = 0
    0.upto((count + page_size)/page_size) do |i|
      statement.execute(i*page_size, table.has_id? ? (i+1)*page_size : page_size)
      while row = statement.fetch
        counter += 1
        yield(row, counter)
      end
    end
    counter
  end
end
 
class Writer
end
 
 
class PostgresWriter < Writer
  def column_description(column)
    "#{PGconn.quote_ident(column[:name])} #{column_type_info(column)}"
  end
  
  def column_type(column)
    column_type_info(column).split(" ").first
  end
  
  def column_type_info(column)
    if column[:primary_key] && column[:name] == "id"
      return "integer DEFAULT nextval('#{column[:table_name]}_#{column[:name]}_seq'::regclass) NOT NULL"
    end
    
    default = column[:default] ? " DEFAULT #{column[:default] == nil ? 'NULL' : "'"+PGconn.escape(column[:default])+"'"}" : nil
    null = column[:null] ? "" : " NOT NULL"
    type = 
    case column[:type]
    when "varchar"
      default = default + "::character varying" if default
#      puts "VARCHAR: #{column.inspect}"
      "character varying(#{column[:length]})"
    when "integer"
      default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default].to_i}" if default
      "integer"
    when "tinyint"
      default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default].to_i}" if default
      "smallint"
    when "datetime"
      default = nil
      "timestamp without time zone"
    when "date"
      default = nil
      "date"
    when "boolean"
      default = " DEFAULT #{column[:default].to_i == 1 ? 'true' : 'false'}" if default
      "boolean"
    when "blob"
      "bytea"
    when "mediumtext"
      "text"
    when "longtext"
      "text"
    when "text"
      "text"
    when "double"
      default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default
      "double precision"
    when /^enum/
      default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default
      enum = column[:type].gsub(/enum|\(|\)/, '')
      max_enum_size = enum.split(',').map{ |check| check.size() -2}.sort[-1]
      "character varying(#{max_enum_size}), check( #{column[:name]} in (#{enum}))"
    when "float"
      default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default].to_f}" if default
      "real"
    when "decimal"
      default = " DEFAULT #{column[:default].nil? ? 'NULL' : column[:default]}" if default
      "numeric(#{column[:length] || 10}, #{column[:decimals] || 0})"
    when "timestamp"
      default = " DEFAULT CURRENT_TIMESTAMP" if column[:default] == "CURRENT_TIMESTAMP"
      "timestamp without time zone"
    when "time"
      default = " DEFAULT now" if default
      "time without time zone"
    else
      puts "Unknown #{column.inspect}"
      column[:type].inspect
      return ""
    end
    "#{type}#{default}#{null}"
  end
  
end
 
class PostgresFileWriter < PostgresWriter
  def initialize(file)
    @f = File.open(file, "w+")
    @f << <<-EOF
-- MySQL 2 PostgreSQL dump\n
SET client_encoding = 'UTF8';
SET standard_conforming_strings = off;
SET check_function_bodies = false;
SET client_min_messages = warning;
 
EOF
  end
  
  def write_table(table)
    primary_keys = []
    primary_key = nil
    maxval = nil
    
    columns = table.columns.map do |column|
      if column[:primary_key]
        if column[:name] == "id"
          primary_key = column[:name]
          maxval = column[:maxval] < 1 ? 1 : column[:maxval] + 1
        end
        primary_keys << column[:name]
      end
      "  " + column_description(column)
    end.join(",\n")
    
    if primary_key
      
      @f << <<-EOF
--
-- Name: #{table.name}_#{primary_key}_seq; Type: SEQUENCE; Schema: public
--
 
DROP SEQUENCE IF EXISTS #{table.name}_#{primary_key}_seq CASCADE;
 
CREATE SEQUENCE #{table.name}_#{primary_key}_seq
    INCREMENT BY 1
    NO MAXVALUE
    NO MINVALUE
    CACHE 1;
    
    
SELECT pg_catalog.setval('#{table.name}_#{primary_key}_seq', #{maxval}, true);
 
      EOF
    end
    
    @f << <<-EOF
-- Table: #{table.name}
 
-- DROP TABLE #{table.name};
DROP TABLE IF EXISTS #{PGconn.quote_ident(table.name)} CASCADE;
 
CREATE TABLE #{PGconn.quote_ident(table.name)} (
EOF
  
    @f << columns
 
    if primary_index = table.indexes.find {|index| index[:primary]}
      @f << ",\n  CONSTRAINT #{table.name}_pkey PRIMARY KEY(#{primary_index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")})"
    end
    
    @f << <<-EOF
\n)
WITHOUT OIDS;
EOF
  
    table.indexes.each do |index|
      next if index[:primary]
      unique = index[:unique] ? "UNIQUE " : nil
      @f << <<-EOF
DROP INDEX IF EXISTS #{PGconn.quote_ident(index[:name])} CASCADE;
CREATE #{unique}INDEX #{PGconn.quote_ident(index[:name])} ON #{PGconn.quote_ident(table.name)} (#{index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")});
EOF
    end
 
  end
  
  def write_indexes(table)
  end
  
  def write_constraints(table)
    table.foreign_keys.each do |key|
      @f << "ALTER TABLE #{PGconn.quote_ident(table.name)} ADD FOREIGN KEY (#{PGconn.quote_ident(key[:column])}) REFERENCES #{PGconn.quote_ident(key[:ref_table])}(#{PGconn.quote_ident(key[:ref_column])});\n"
    end
  end
  
  
  def write_contents(table, reader)
    @f << <<-EOF
--
-- Data for Name: #{table.name}; Type: TABLE DATA; Schema: public
--

COPY "#{table.name}" (#{table.columns.map {|column| PGconn.quote_ident(column[:name])}.join(", ")}) FROM stdin;
EOF
    
    reader.paginated_read(table, 1000) do |row, counter|
      line = []
      table.columns.each_with_index do |column, index|
        row[index] = row[index].to_s if row[index].is_a?(Mysql::Time)
        if column[:type] == "char"
          row[index] = row[index] == 1 ? 't' : row[index] == 0 ? 'f' : row[index]
        end
        if row[index].is_a?(String)
          if column[:type] == "bytea"
            row[index] = PGconn.quote(row[index])
          else
            row[index] = row[index].gsub(/\\/, '\\\\\\').gsub(/\n/,'\n').gsub(/\t/,'\t').gsub(/\r/,'\r') 
          end
        end
        row[index] = '\N' if !row[index]
      end
      @f << row.join("\t") + "\n"
    end
    @f << "\\.\n\n"
    @f << "VACUUM FULL ANALYZE #{PGconn.quote_ident(table.name)};\n\n"
  end
  
  def close
    @f.close
  end
end
 
class PostgresDbWriter < PostgresWriter
  def connection(hostname, login, password, database, port)
    database, schema = database.split(":")
    @conn = PGconn.open('host' => hostname, 'user' => login, 'password' => password, 'dbname' => database, 'port' => port.to_s)
    @conn.exec("SET search_path TO #{PGconn.quote_ident(schema)}") if schema
  end
  
  def initialize(hostname, login, password, database, port = 5432)
    connection(hostname, login, password, database, port)
    @conn.exec("SET client_encoding = 'UTF8'")
    @conn.exec("SET standard_conforming_strings = off") if @conn.server_version >= 80200
    @conn.exec("SET check_function_bodies = false")
    @conn.exec("SET client_min_messages = warning")
  end

  def exists?(relname)
    rc = @conn.select_one("SELECT COUNT(*) FROM pg_class WHERE relname = #{PGconn.quote(relname)}")
    (!rc.nil?) && (!rc.empty?) && (rc.first.to_i > 0)
  end
  
  def write_table(table)
    primary_keys = []
    primary_key = nil
    maxval = nil
    
    columns = table.columns.map do |column|
      if column[:primary_key]
        if column[:name] == "id"
          primary_key = column[:name]
          maxval = column[:maxval] < 1 ? 1 : column[:maxval] + 1
        end
        primary_keys << column[:name]
      end
      "  " + column_description(column)
    end.join(",\n")
    
    if primary_key
      if @conn.server_version < 80200
        primary_key_seq = "#{table.name}_#{primary_key}_seq"
        @conn.exec("DROP SEQUENCE #{primary_key_seq} CASCADE") if exists?(primary_key_seq)
      else
        @conn.exec("DROP SEQUENCE IF EXISTS #{table.name}_#{primary_key}_seq CASCADE")
      end
      @conn.exec <<-EOF
        CREATE SEQUENCE #{table.name}_#{primary_key}_seq
        INCREMENT BY 1
        NO MAXVALUE
        NO MINVALUE
        CACHE 1
      EOF
    
      @conn.exec "SELECT pg_catalog.setval('#{table.name}_#{primary_key}_seq', #{maxval}, true)"
    end
    
    if @conn.server_version < 80200
      @conn.exec "DROP TABLE #{PGconn.quote_ident(table.name)} CASCADE;" if exists?(table.name)
    else
      @conn.exec "DROP TABLE IF EXISTS #{PGconn.quote_ident(table.name)} CASCADE;"
    end
    create_sql = "CREATE TABLE #{PGconn.quote_ident(table.name)} (\n" + columns + "\n)\nWITHOUT OIDS;"
    begin
      @conn.exec(create_sql)
    rescue Exception => e
      puts "Error: \n#{create_sql}"
      raise
    end
    puts "Created table #{table.name}"
 
  end
  
  def write_indexes(table)
    if primary_index = table.indexes.find {|index| index[:primary]}
      @conn.exec("ALTER TABLE #{PGconn.quote_ident(table.name)} ADD CONSTRAINT #{table.name}_pkey PRIMARY KEY(#{primary_index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")})")
    end
    
    table.indexes.each do |index|
      next if index[:primary]
      unique = index[:unique] ? "UNIQUE " : nil
      if @conn.server_version < 80200
        @conn.exec("DROP INDEX #{PGconn.quote_ident(index[:name])} CASCADE;") if exists?(index[:name])
      else
        @conn.exec("DROP INDEX IF EXISTS #{PGconn.quote_ident(index[:name])} CASCADE;")
      end
      @conn.exec("CREATE #{unique}INDEX #{PGconn.quote_ident(index[:name])} ON #{PGconn.quote_ident(table.name)} (#{index[:columns].map {|col| PGconn.quote_ident(col)}.join(", ")});")
    end
    
    
    @conn.exec("VACUUM FULL ANALYZE #{PGconn.quote_ident(table.name)}")
    puts "Indexed table #{table.name}"
  rescue Exception => e
    puts "Couldn't create indexes on #{table} (#{table.indexes.inspect})"
    puts e
    puts e.backtrace[0,3].join("\n")
  end
  
  def write_constraints(table)
    table.foreign_keys.each do |key|
      key_sql = "ALTER TABLE #{PGconn.quote_ident(table.name)} ADD FOREIGN KEY (#{PGconn.quote_ident(key[:column])}) REFERENCES #{PGconn.quote_ident(key[:ref_table])}(#{PGconn.quote_ident(key[:ref_column])})"
      begin
        @conn.exec(key_sql)
      rescue Exception => e
        puts "Error: \n#{key_sql}\n#{e}"
      end
    end
  end
  
  def write_contents(table, reader)
    _time1 = Time.now
    copy_line = "COPY #{PGconn.quote_ident(table.name)} (#{table.columns.map {|column| PGconn.quote_ident(column[:name])}.join(", ")}) FROM stdin;"
    @conn.exec(copy_line)
    print "Loading #{table.name}: "
    STDOUT.flush
    _counter = reader.paginated_read(table, 1000) do |row, counter|
      line = []
      table.columns.each_with_index do |column, index|
        if !row[index]
          row[index] = '\N'
          next
        end
        if column[:type] == "time"
          row[index] = "%02d:%02d:%02d" % [row[index].hour, row[index].minute, row[index].second]
          next
        end
        if row[index].is_a?(Mysql::Time)
          row[index] = row[index].to_s.gsub('0000-00-00 00:00', '1970-01-01 00:00')
          next
        end
        
        if column_type(column) == "boolean"
          row[index] = row[index] == 1 ? 't' : row[index] == 0 ? 'f' : row[index]
          next
        end
        
        if row[index].is_a?(String)
          if column_type(column) == "bytea"
            row[index] = PGconn.quote(row[index])
          else
            row[index] = row[index].gsub(/\\/, '\\\\\\').gsub(/\n/,'\n').gsub(/\t/,'\t').gsub(/\r/,'\r').gsub(/\0/, '')
          end
        end
      end
      @conn.putline(row.join("\t") + "\n")
      
      if counter % 5000 == 0
        print "*"
        STDOUT.flush
        @conn.endcopy
        @conn.exec(copy_line)
      elsif counter % 1000 == 0
        print "."
        STDOUT.flush
      end
    end
    _time2 = Time.now
    puts " #{_counter} (#{((_time2 - _time1) / 60).round}min #{((_time2 - _time1) % 60).round}s)"
#    @conn.putline(".\n")
    @conn.endcopy
  end
  
  def close
    @conn.close
  end
end
 
 
class Converter
  attr_reader :reader, :writer
  
  def initialize(reader, writer, options = {})
    @reader = reader
    @writer = writer
    @exclude_tables = options[:exclude_tables] || []
    @only_tables = options[:only_tables] ? Array(options[:only_tables]) : nil
    @supress_data = options[:supress_data]
  end
  
  def convert
    _time1 = Time.now
    
    tables = reader.tables.
      reject {|table| @exclude_tables.include?(table.name)}.
      select {|table| @only_tables ? @only_tables.include?(table.name) : true}
    
    tables.each do |table|
      writer.write_table(table)
    end
 
    _time2 = Time.now
    tables.each do |table|
      writer.write_contents(table, reader)
    end unless @supress_data
 
    _time3 = Time.now
    tables.each do |table|
      writer.write_indexes(table)
    end
    tables.each do |table|
      writer.write_constraints(table)
    end
 
 
    writer.close
    _time4 = Time.now
    puts "Table creation #{((_time2 - _time1) / 60).round} min, loading #{((_time3 - _time2) / 60).round} min, indexing #{((_time4 - _time3) / 60).round} min, total #{((_time4 - _time1) / 60).round} min"
  end
end


reader = MysqlReader.new('localhost', 'mdvbz', 'coin123', 'mdvbz')
#writer = PostgresFileWriter.new($ARGV[2] || "output.sql")
writer = PostgresFileWriter.new("output.sql")
#writer = PostgresDbWriter.new('localhost', 'prophotos', '123', 'prophotos_development:old')
#converter = Converter.new(reader, writer, :only_tables => %w(bugs profiles rpmpkg))
converter = Converter.new(reader, writer, :only_tables => %w(mdvbz_bugs mdvbz_profiles mdvbz_rpmpkg))
#converter = Converter.new(reader, writer, :only_tables => %w(rpmpkg))
converter.convert


Powered by WebSVN v1.61