目次
Python 3 で mysql.connector
を使って MySQL に接続した方法を紹介します。
環境設定
Ubuntu と Amazon Linux で試しました。
Ubuntu
Ubuntu 16.04.1 LTS でやりました。
次のようにして必要なパッケージをインストールします。
1 |
sudo apt-get install python3-mysql.connector |
Amazon Linux
Python 3 と その pip パッケージ はインストールされているものとします。 Python のバージョンは 3.4 です。
MySQL のサイトから ファイルをダウンロードしてインストールします。 Platform は Platform Independent を選びました。 また、ダウンロードしたバージョンは 2.1.3 でした。 私が (yum や pip ではインストールできませんでした、 きっとやり方はあると思うのですが。)
ダウンロードしたファイルを (scp
などを用いて) サーバにコピーします。 私はコピーしたファイルを /usr/local/src
に移動し、 root
ユーザになって展開しました。
そして、root
ユーザで次のコマンドを実行します。
1 2 |
cd /usr/local/src/mysql-connector-python-2.1.3 python3 setup.py install |
接続確認
python3
を実行して Python コンソール を開き、 次のコマンドを実行すると接続確認ができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import mysql.connector config = { 'user': 'app', 'password': 'app', 'host': 'rds.amazonaws.com', 'database': 'development_db', } cnx = mysql.connector.connect(**config) cur = cnx.cursor(buffered=True) cur.execute("SHOW STATUS LIKE 'Uptime'") print(cur.fetchone()) # => ('Uptime', '22073875') cur.close() # => True cnx.close() |
うまくいけば、 Uptime が表示されます。
コードで接続してみる
たとえば次のようなコードで、 SELECT
を実行できます。 Config クラス がないと動きません。 接続情報を直接書いて Config の import をなくせばOKです。 SQLインジェクションの可能性も残っています。 Python経験が浅いのでコードの質もきっと低いです。 さーせん。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
import mysql.connector from config.config import Config from app.model.sample import Sample class BaseMapper: @staticmethod def create_connection(): return mysql.connector.connect( user=Config.get('database.user'), database=Config.get('database.name'), password=Config.get('database.pass'), host=Config.get('database.host'), ) @staticmethod def execute(connection, sql, parameters = None): """ :param connection: :param sql: :type sql: str :param parameters: :type parameters: tuple :return: """ cursor = connection.cursor() cursor.execute((sql), parameters) results = [] for item in cursor: results.append(item) cursor.close() return results class SampleMapper(BaseMapper): def find_samples_by_name(self, connection, name): """ :param name: :type name: int :return: """ if connection is None: connection = self.create_connection() sql = "SELECT column_1, cplumn_2" " FROM samples" " WHERE samples.name = " + name records = BaseMapper.execute(connection, sql, phrase_id) samples = [] for record in records: sample = Sample() sample.column_1 = record[0] sample.column_2 = record[1] samples.append(sample) return samples |