Copied MySQL erlang library from ejabberd-modules SVN

This commit is contained in:
Badlop 2013-03-19 13:28:42 +01:00
parent 9deb294328
commit 1b304aaf0a
11 changed files with 1942 additions and 2 deletions

View File

@ -77,7 +77,7 @@ endif
prefix = @prefix@
exec_prefix = @exec_prefix@
SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @pam@ @web@ stringprep stun @tls@ @odbc@ @ejabberd_zlib@
SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @pam@ @web@ mysql stringprep stun @tls@ @odbc@ @ejabberd_zlib@
ERLSHLIBS += expat_erl.so
ERLBEHAVS = cyrsasl.erl gen_mod.erl p1_fsm.erl ejabberd_auth.erl
SOURCES_ALL = $(wildcard *.erl)

3
src/configure vendored
View File

@ -4696,7 +4696,7 @@ fi
ac_config_files="$ac_config_files Makefile $make_mod_irc $make_mod_muc $make_mod_pubsub $make_mod_proxy65 $make_eldap $make_pam $make_web stringprep/Makefile stun/Makefile $make_tls $make_odbc $make_ejabberd_zlib"
ac_config_files="$ac_config_files Makefile $make_mod_irc $make_mod_muc $make_mod_pubsub $make_mod_proxy65 $make_eldap $make_pam $make_web mysql/Makefile stringprep/Makefile stun/Makefile $make_tls $make_odbc $make_ejabberd_zlib"
#openssl
@ -5861,6 +5861,7 @@ do
"$make_eldap") CONFIG_FILES="$CONFIG_FILES $make_eldap" ;;
"$make_pam") CONFIG_FILES="$CONFIG_FILES $make_pam" ;;
"$make_web") CONFIG_FILES="$CONFIG_FILES $make_web" ;;
"mysql/Makefile") CONFIG_FILES="$CONFIG_FILES mysql/Makefile" ;;
"stringprep/Makefile") CONFIG_FILES="$CONFIG_FILES stringprep/Makefile" ;;
"stun/Makefile") CONFIG_FILES="$CONFIG_FILES stun/Makefile" ;;
"$make_tls") CONFIG_FILES="$CONFIG_FILES $make_tls" ;;

View File

@ -111,6 +111,7 @@ AC_CONFIG_FILES([Makefile
$make_eldap
$make_pam
$make_web
mysql/Makefile
stringprep/Makefile
stun/Makefile
$make_tls

68
src/mysql/COPYING Normal file
View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2001-2003 Kungliga Tekniska Högskolan
* (Royal Institute of Technology, Stockholm, Sweden).
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* 3. Neither the name of the Institute nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
Copyright (c) 2004, Sektionen för IT och media, Stockholms
universitet
All rights reserved.
Redistribution and use in source and binary forms, with or
without modification, are permitted provided that the following
conditions are met:
1. Redistributions of source code must retain the above
copyright notice, this list of conditions and the following
disclaimer.
2. Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials
provided with the distribution.
3. Neither the name of the author nor the names of its
contributors may be used to endorse or promote products
derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

38
src/mysql/Makefile.in Normal file
View File

@ -0,0 +1,38 @@
# $Id: Makefile.in 1453 2008-07-16 16:58:42Z badlop $
CC = @CC@
CFLAGS = @CFLAGS@
CPPFLAGS = @CPPFLAGS@
LDFLAGS = @LDFLAGS@
LIBS = @LIBS@
ERLANG_CFLAGS = @ERLANG_CFLAGS@
ERLANG_LIBS = @ERLANG_LIBS@
EFLAGS += -I ..
EFLAGS += -pz ..
# make debug=true to compile Erlang module with debug informations.
ifdef debug
EFLAGS+=+debug_info
endif
OUTDIR = ..
SOURCES = $(wildcard *.erl)
BEAMS = $(addprefix $(OUTDIR)/,$(SOURCES:.erl=.beam))
all: $(BEAMS)
$(OUTDIR)/%.beam: %.erl
@ERLC@ -W $(EFLAGS) -o $(OUTDIR) $<
clean:
rm -f $(BEAMS)
distclean: clean
rm -f Makefile
TAGS:
etags *.erl

18
src/mysql/Makefile.win32 Normal file
View File

@ -0,0 +1,18 @@
include ..\Makefile.inc
EFLAGS = -I .. -pz ..
OUTDIR = ..
BEAMS = ..\stun_codec.beam ..\ejabberd_stun.beam
ALL : $(BEAMS)
CLEAN :
-@erase $(BEAMS)
$(OUTDIR)\stun_codec.beam : stun_codec.erl
erlc -W $(EFLAGS) -o $(OUTDIR) stun_codec.erl
$(OUTDIR)\ejabberd_stun.beam : ejabberd_stun.erl
erlc -W $(EFLAGS) -o $(OUTDIR) ejabberd_stun.erl

692
src/mysql/mysql.erl Normal file
View File

@ -0,0 +1,692 @@
%%%-------------------------------------------------------------------
%%% File : mysql.erl
%%% Author : Magnus Ahltorp <ahltorp@nada.kth.se>
%%% Descrip.: MySQL client.
%%%
%%% Created : 4 Aug 2005 by Magnus Ahltorp <ahltorp@nada.kth.se>
%%%
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
%%% See the file COPYING
%%%
%%% Usage:
%%%
%%%
%%% Call one of the start-functions before any call to fetch/2
%%%
%%% start_link(Id, Host, User, Password, Database)
%%% start_link(Id, Host, Port, User, Password, Database)
%%% start_link(Id, Host, User, Password, Database, LogFun)
%%% start_link(Id, Host, Port, User, Password, Database, LogFun)
%%%
%%% Id is a connection group identifier. If you want to have more
%%% than one connection to a server (or a set of MySQL replicas),
%%% add more with
%%%
%%% connect(Id, Host, Port, User, Password, Database, Reconnect)
%%%
%%% use 'undefined' as Port to get default MySQL port number (3306).
%%% MySQL querys will be sent in a per-Id round-robin fashion.
%%% Set Reconnect to 'true' if you want the dispatcher to try and
%%% open a new connection, should this one die.
%%%
%%% When you have a mysql_dispatcher running, this is how you make a
%%% query :
%%%
%%% fetch(Id, "select * from hello") -> Result
%%% Result = {data, MySQLRes} | {updated, MySQLRes} |
%%% {error, MySQLRes}
%%%
%%% Actual data can be extracted from MySQLRes by calling the following API
%%% functions:
%%% - on data received:
%%% FieldInfo = mysql:get_result_field_info(MysqlRes)
%%% AllRows = mysql:get_result_rows(MysqlRes)
%%% with FieldInfo = list() of {Table, Field, Length, Name}
%%% and AllRows = list() of list() representing records
%%% - on update:
%%% Affected = mysql:get_result_affected_rows(MysqlRes)
%%% with Affected = integer()
%%% - on error:
%%% Reason = mysql:get_result_reason(MysqlRes)
%%% with Reason = string()
%%%
%%% If you just want a single MySQL connection, or want to manage your
%%% connections yourself, you can use the mysql_conn module as a
%%% stand-alone single MySQL connection. See the comment at the top of
%%% mysql_conn.erl.
%%%
%%%-------------------------------------------------------------------
-module(mysql).
-behaviour(gen_server).
%%--------------------------------------------------------------------
%% External exports
%%--------------------------------------------------------------------
-export([start_link/5,
start_link/6,
start_link/7,
fetch/2,
fetch/3,
get_result_field_info/1,
get_result_rows/1,
get_result_affected_rows/1,
get_result_reason/1,
quote/1,
asciz_binary/2,
connect/7,
stop/0,
gc_each/1
]).
%%--------------------------------------------------------------------
%% Internal exports - just for mysql_* modules
%%--------------------------------------------------------------------
-export([log/3,
log/4
]).
%%--------------------------------------------------------------------
%% Internal exports - gen_server callbacks
%%--------------------------------------------------------------------
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%%--------------------------------------------------------------------
%% Records
%%--------------------------------------------------------------------
-include("mysql.hrl").
-record(state, {
conn_list, %% list() of mysql_connection record()
log_fun, %% undefined | function for logging,
gc_tref %% undefined | timer:TRef
}).
-record(mysql_connection, {
id, %% term(), user of 'mysql' modules id of this socket group
conn_pid, %% pid(), mysql_conn process
reconnect, %% true | false, should mysql_dispatcher try to reconnect if this connection dies?
host, %% undefined | string()
port, %% undefined | integer()
user, %% undefined | string()
password, %% undefined | string()
database %% undefined | string()
}).
%%--------------------------------------------------------------------
%% Macros
%%--------------------------------------------------------------------
-define(SERVER, mysql_dispatcher).
-define(CONNECT_TIMEOUT, 5000).
-define(LOCAL_FILES, 128).
-define(PORT, 3306).
%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link(Id, Host, User, Password, Database)
%% start_link(Id, Host, Port, User, Password, Database)
%% start_link(Id, Host, User, Password, Database, LogFun)
%% start_link(Id, Host, Port, User, Password, Database,
%% LogFun)
%% Id = term(), first connection-group Id
%% Host = string()
%% Port = integer()
%% User = string()
%% Password = string()
%% Database = string()
%% LogFun = undefined | function() of arity 3
%% Descrip.: Starts the MySQL client gen_server process.
%% Returns : {ok, Pid} | ignore | {error, Error}
%%--------------------------------------------------------------------
start_link(Id, Host, User, Password, Database) when is_list(Host), is_list(User), is_list(Password),
is_list(Database) ->
start_link(Id, Host, ?PORT, User, Password, Database, undefined).
start_link(Id, Host, Port, User, Password, Database) when is_list(Host), is_integer(Port), is_list(User),
is_list(Password), is_list(Database) ->
start_link(Id, Host, Port, User, Password, Database, undefined);
start_link(Id, Host, User, Password, Database, LogFun) when is_list(Host), is_list(User), is_list(Password),
is_list(Database) ->
start_link(Id, Host, ?PORT, User, Password, Database, LogFun).
start_link(Id, Host, Port, User, Password, Database, LogFun) when is_list(Host), is_integer(Port), is_list(User),
is_list(Password), is_list(Database) ->
crypto:start(),
gen_server:start_link({local, ?SERVER}, ?MODULE, [Id, Host, Port, User, Password, Database, LogFun], []).
stop() ->
gen_server:call(?SERVER, stop).
gc_each(Millisec) ->
gen_server:call(?SERVER, {gc_each, Millisec}).
%%--------------------------------------------------------------------
%% Function: fetch(Id, Query)
%% fetch(Id, Query, Timeout)
%% Id = term(), connection-group Id
%% Query = string(), MySQL query in verbatim
%% Timeout = integer() | infinity, gen_server timeout value
%% Descrip.: Send a query and wait for the result.
%% Returns : {data, MySQLRes} |
%% {updated, MySQLRes} |
%% {error, MySQLRes}
%% MySQLRes = term()
%%--------------------------------------------------------------------
fetch(Id, Query) when is_list(Query) ->
gen_server:call(?SERVER, {fetch, Id, Query}).
fetch(Id, Query, Timeout) when is_list(Query) ->
gen_server:call(?SERVER, {fetch, Id, Query}, Timeout).
%%--------------------------------------------------------------------
%% Function: get_result_field_info(MySQLRes)
%% MySQLRes = term(), result of fetch function on "data"
%% Descrip.: Extract the FieldInfo from MySQL Result on data received
%% Returns : FieldInfo
%% FieldInfo = list() of {Table, Field, Length, Name}
%%--------------------------------------------------------------------
get_result_field_info(#mysql_result{fieldinfo = FieldInfo}) ->
FieldInfo.
%%--------------------------------------------------------------------
%% Function: get_result_rows(MySQLRes)
%% MySQLRes = term(), result of fetch function on "data"
%% Descrip.: Extract the Rows from MySQL Result on data received
%% Returns : Rows
%% Rows = list() of list() representing records
%%--------------------------------------------------------------------
get_result_rows(#mysql_result{rows=AllRows}) ->
AllRows.
%%--------------------------------------------------------------------
%% Function: get_result_affected_rows(MySQLRes)
%% MySQLRes = term(), result of fetch function on "updated"
%% Descrip.: Extract the Rows from MySQL Result on update
%% Returns : AffectedRows
%% AffectedRows = integer()
%%--------------------------------------------------------------------
get_result_affected_rows(#mysql_result{affectedrows=AffectedRows}) ->
AffectedRows.
%%--------------------------------------------------------------------
%% Function: get_result_reason(MySQLRes)
%% MySQLRes = term(), result of fetch function on "error"
%% Descrip.: Extract the error Reason from MySQL Result on error
%% Returns : Reason
%% Reason = string()
%%--------------------------------------------------------------------
get_result_reason(#mysql_result{error=Reason}) ->
Reason.
%%--------------------------------------------------------------------
%% Function: quote(String)
%% String = string()
%% Descrip.: Quote a string so that it can be included safely in a
%% MySQL query.
%% Returns : Quoted = string()
%%--------------------------------------------------------------------
quote(String) when is_list(String) ->
[34 | lists:reverse([34 | quote(String, [])])]. %% 34 is $"
quote([], Acc) ->
Acc;
quote([0 | Rest], Acc) ->
quote(Rest, [$0, $\\ | Acc]);
quote([10 | Rest], Acc) ->
quote(Rest, [$n, $\\ | Acc]);
quote([13 | Rest], Acc) ->
quote(Rest, [$r, $\\ | Acc]);
quote([$\\ | Rest], Acc) ->
quote(Rest, [$\\ , $\\ | Acc]);
quote([39 | Rest], Acc) -> %% 39 is $'
quote(Rest, [39, $\\ | Acc]); %% 39 is $'
quote([34 | Rest], Acc) -> %% 34 is $"
quote(Rest, [34, $\\ | Acc]); %% 34 is $"
quote([26 | Rest], Acc) ->
quote(Rest, [$Z, $\\ | Acc]);
quote([C | Rest], Acc) ->
quote(Rest, [C | Acc]).
%%--------------------------------------------------------------------
%% Function: asciz_binary(Data, Acc)
%% Data = binary()
%% Acc = list(), input accumulator
%% Descrip.: Find the first zero-byte in Data and add everything
%% before it to Acc, as a string.
%% Returns : {NewList, Rest}
%% NewList = list(), Acc plus what we extracted from Data
%% Rest = binary(), whatever was left of Data, not
%% including the zero-byte
%%--------------------------------------------------------------------
asciz_binary(<<>>, Acc) ->
{lists:reverse(Acc), <<>>};
asciz_binary(<<0:8, Rest/binary>>, Acc) ->
{lists:reverse(Acc), Rest};
asciz_binary(<<C:8, Rest/binary>>, Acc) ->
asciz_binary(Rest, [C | Acc]).
%%--------------------------------------------------------------------
%% Function: connect(Id, Host, Port, User, Password, Database,
%% Reconnect)
%% Id = term(), connection-group Id
%% Host = string()
%% Port = undefined | integer()
%% User = string()
%% Password = string()
%% Database = string()
%% Reconnect = true | false
%% Descrip.: Starts a MySQL connection and, if successfull, registers
%% it with the mysql_dispatcher.
%% Returns : {ok, ConnPid} | {error, Reason}
%%--------------------------------------------------------------------
connect(Id, Host, undefined, User, Password, Database, Reconnect) ->
connect(Id, Host, ?PORT, User, Password, Database, Reconnect);
connect(Id, Host, Port, User, Password, Database, Reconnect) ->
{ok, LogFun} = gen_server:call(?SERVER, get_logfun),
case mysql_conn:start(Host, Port, User, Password, Database, LogFun) of
{ok, ConnPid} ->
MysqlConn =
case Reconnect of
true ->
#mysql_connection{id = Id,
conn_pid = ConnPid,
reconnect = true,
host = Host,
port = Port,
user = User,
password = Password,
database = Database
};
false ->
#mysql_connection{id = Id,
conn_pid = ConnPid,
reconnect = false
}
end,
case gen_server:call(?SERVER, {add_mysql_connection, MysqlConn}) of
ok ->
{ok, ConnPid};
Res ->
Res
end;
{error, Reason} ->
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Function: log(LogFun, Level, Format)
%% log(LogFun, Level, Format, Arguments)
%% LogFun = undefined | function() with arity 3
%% Level = debug | normal | error
%% Format = string()
%% Arguments = list() of term()
%% Descrip.: Either call the function LogFun with the Level, Format
%% and Arguments as parameters or log it to the console if
%% LogFun is undefined.
%% Returns : void()
%%
%% Note : Exported only for use by the mysql_* modules.
%%
%%--------------------------------------------------------------------
log(LogFun, Level, Format) ->
log(LogFun, Level, Format, []).
log(LogFun, Level, Format, Arguments) when is_function(LogFun) ->
LogFun(Level, Format, Arguments);
log(undefined, _Level, Format, Arguments) ->
%% default is to log to console
io:format(Format, Arguments),
io:format("~n", []).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Args = [Id, Host, Port, User, Password, Database, LogFun]
%% Id = term(), connection-group Id
%% Host = string()
%% Port = integer()
%% User = string()
%% Password = string()
%% Database = string()
%% LogFun = undefined | function() with arity 3
%% Descrip.: Initiates the gen_server (MySQL dispatcher).
%%--------------------------------------------------------------------
init([Id, Host, Port, User, Password, Database, LogFun]) ->
case mysql_conn:start(Host, Port, User, Password, Database, LogFun) of
{ok, ConnPid} ->
MysqlConn = #mysql_connection{id = Id,
conn_pid = ConnPid,
reconnect = true,
host = Host,
port = Port,
user = User,
password = Password,
database = Database
},
case add_mysql_conn(MysqlConn, []) of
{ok, ConnList} ->
{ok, #state{log_fun = LogFun,
conn_list = ConnList,
gc_tref = undefined
}};
error ->
Msg = "mysql: Failed adding first MySQL connection handler to my list, exiting",
log(LogFun, error, Msg),
{error, Msg}
end;
{error, Reason} ->
log(LogFun, error, "mysql: Failed starting first MySQL connection handler, exiting"),
{stop, {error, Reason}}
end.
%%--------------------------------------------------------------------
%% Function: handle_call(Msg, From, State)
%% Descrip.: Handling call messages.
%% Returns : {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Function: handle_call({fetch, Id, Query}, From, State)
%% Id = term(), connection-group id
%% Query = string(), MySQL query
%% Descrip.: Make a MySQL query. Use the first connection matching Id
%% in our connection-list. Don't block the mysql_dispatcher
%% by returning {noreply, ...} here and let the mysql_conn
%% do gen_server:reply(...) when it has an answer.
%% Returns : {noreply, NewState} |
%% {reply, {error, Reason}, State}
%% NewState = state record()
%% Reason = atom() | string()
%%--------------------------------------------------------------------
handle_call({fetch, Id, Query}, From, State) ->
log(State#state.log_fun, debug, "mysql: fetch ~p (id ~p)", [Query, Id]),
case get_next_mysql_connection_for_id(Id, State#state.conn_list) of
{ok, MysqlConn, RestOfConnList} when is_record(MysqlConn, mysql_connection) ->
mysql_conn:fetch(MysqlConn#mysql_connection.conn_pid, Query, From),
%% move this mysql socket to the back of the list
NewConnList = RestOfConnList ++ [MysqlConn],
%% The ConnPid process does a gen_server:reply() when it has an answer
{noreply, State#state{conn_list = NewConnList}};
nomatch ->
%% we have no active connection matching Id
{reply, {error, no_connection}, State}
end;
%%--------------------------------------------------------------------
%% Function: handle_call({add_mysql_connection, Conn}, From, State)
%% Conn = mysql_connection record()
%% Descrip.: Add Conn to our list of connections.
%% Returns : {reply, Reply, NewState}
%% Reply = ok | {error, Reason}
%% NewState = state record()
%% Reason = string()
%%--------------------------------------------------------------------
handle_call({add_mysql_connection, Conn}, _From, State) when is_record(Conn, mysql_connection) ->
case add_mysql_conn(Conn, State#state.conn_list) of
{ok, NewConnList} ->
{Id, ConnPid} = {Conn#mysql_connection.id, Conn#mysql_connection.conn_pid},
log(State#state.log_fun, normal, "mysql: Added connection with id '~p' (pid ~p) to my list",
[Id, ConnPid]),
{reply, ok, State#state{conn_list = NewConnList}};
error ->
{reply, {error, "failed adding MySQL connection to my list"}, State}
end;
%%--------------------------------------------------------------------
%% Function: handle_call(get_logfun, From, State)
%% Descrip.: Fetch our logfun.
%% Returns : {reply, {ok, LogFun}, State}
%% LogFun = undefined | function() with arity 3
%%--------------------------------------------------------------------
handle_call(get_logfun, _From, State) ->
{reply, {ok, State#state.log_fun}, State};
handle_call(stop, _From, State) ->
{stop, normal, State};
handle_call({gc_each, Millisec}, _From, State) ->
case State#state.gc_tref of
undefined -> ok;
TRef ->
timer:cancel(TRef)
end,
case timer:send_interval(Millisec, gc) of
{ok, NewTRef} ->
{reply, ok, State#state{gc_tref = NewTRef}};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
handle_call(Unknown, _From, State) ->
log(State#state.log_fun, error, "mysql: Received unknown gen_server call : ~p", [Unknown]),
{reply, {error, "unknown gen_server call in mysql client"}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State)
%% Descrip.: Handling cast messages
%% Returns : {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast(Unknown, State) ->
log(State#state.log_fun, error, "mysql: Received unknown gen_server cast : ~p", [Unknown]),
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Msg, State)
%% Descrip.: Handling all non call/cast messages
%% Returns : {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Function: handle_info({'DOWN', ...}, State)
%% Descrip.: Handle a message that one of our monitored processes
%% (mysql_conn processes in our connection list) has exited.
%% Remove the entry from our list.
%% Returns : {noreply, NewState} |
%% {stop, normal, State}
%% NewState = state record()
%%
%% Note : For now, we stop if our connection list becomes empty.
%% We should try to reconnect for a while first, to not
%% eventually stop the whole OTP application if the MySQL-
%% server is shut down and the mysql_dispatcher was super-
%% vised by an OTP supervisor.
%%--------------------------------------------------------------------
handle_info({'DOWN', _MonitorRef, process, Pid, Info}, State) ->
LogFun = State#state.log_fun,
case remove_mysql_connection_using_pid(Pid, State#state.conn_list, []) of
{ok, Conn, NewConnList} ->
LogLevel = case Info of
normal -> normal;
_ -> error
end,
log(LogFun, LogLevel, "mysql: MySQL connection pid ~p exited : ~p", [Pid, Info]),
log(LogFun, normal, "mysql: Removed MySQL connection with pid ~p from list",
[Pid]),
case Conn#mysql_connection.reconnect of
true ->
start_reconnect(Conn, LogFun);
false ->
ok
end,
{noreply, State#state{conn_list = NewConnList}};
nomatch ->
log(LogFun, error, "mysql: Received 'DOWN' signal from pid ~p not in my list", [Pid]),
{noreply, State}
end;
handle_info(gc, #state{conn_list = Connections} = State) ->
[erlang:garbage_collect(C#mysql_connection.conn_pid) || C <- Connections],
erlang:garbage_collect(self()),
{noreply, State};
handle_info(Info, State) ->
log(State#state.log_fun, error, "mysql: Received unknown signal : ~p", [Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State)
%% Descrip.: Shutdown the server
%% Returns : Reason
%%--------------------------------------------------------------------
terminate(Reason, State) ->
LogFun = State#state.log_fun,
LogLevel = case Reason of
normal -> debug;
_ -> error
end,
log(LogFun, LogLevel, "mysql: Terminating with reason : ~p", [Reason]),
lists:foreach(fun(MysqlConn) ->
MysqlConn#mysql_connection.conn_pid ! close
end, State#state.conn_list),
Reason.
%%--------------------------------------------------------------------
%% Function: code_change(_OldVsn, State, _Extra)
%% Descrip.: Convert process state when code is changed
%% Returns : {ok, State}
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%====================================================================
%% Internal functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: add_mysql_conn(Conn, ConnList)
%% Conn = mysql_connection record()
%% ConnList = list() of mysql_connection record()
%% Descrip.: Set up process monitoring of the mysql_conn process and
%% then add it (first) to ConnList.
%% Returns : NewConnList = list() of mysql_connection record()
%%--------------------------------------------------------------------
add_mysql_conn(Conn, ConnList) when is_record(Conn, mysql_connection), is_list(ConnList) ->
erlang:monitor(process, Conn#mysql_connection.conn_pid),
{ok, [Conn | ConnList]}.
%%--------------------------------------------------------------------
%% Function: remove_mysql_connection_using_pid(Pid, ConnList)
%% Pid = pid()
%% ConnList = list() of mysql_connection record()
%% Descrip.: Removes the first mysql_connection in ConnList that has
%% a pid matching Pid.
%% Returns : {ok, Conn, NewConnList} | nomatch
%% Conn = mysql_connection record()
%% NewConnList = list() of mysql_connection record()
%%--------------------------------------------------------------------
remove_mysql_connection_using_pid(Pid, [#mysql_connection{conn_pid = Pid} = H | T], Res) ->
{ok, H, lists:reverse(Res) ++ T};
remove_mysql_connection_using_pid(Pid, [H | T], Res) when is_record(H, mysql_connection) ->
remove_mysql_connection_using_pid(Pid, T, [H | Res]);
remove_mysql_connection_using_pid(_Pid, [], _Res) ->
nomatch.
%%--------------------------------------------------------------------
%% Function: get_next_mysql_connection_for_id(Id, ConnList)
%% Id = term(), connection-group id
%% ConnList = list() of mysql_connection record()
%% Descrip.: Find the first mysql_connection in ConnList that has an
%% id matching Id.
%% Returns : {ok, Conn, NewConnList} | nomatch
%% Conn = mysql_connection record()
%% NewConnList = list() of mysql_connection record(), same
%% as ConnList but without Conn
%%--------------------------------------------------------------------
get_next_mysql_connection_for_id(Id, ConnList) ->
get_next_mysql_connection_for_id(Id, ConnList, []).
get_next_mysql_connection_for_id(Id, [#mysql_connection{id = Id} = H | T], Res) ->
{ok, H, lists:reverse(Res) ++ T};
get_next_mysql_connection_for_id(Id, [H | T], Res) when is_record(H, mysql_connection) ->
get_next_mysql_connection_for_id(Id, T, [H | Res]);
get_next_mysql_connection_for_id(_Id, [], _Res) ->
nomatch.
%%--------------------------------------------------------------------
%% Function: start_reconnect(Conn, LogFun)
%% Conn = mysql_connection record()
%% LogFun = undefined | function() with arity 3
%% Descrip.: Spawns a process that will try to re-establish a new
%% connection instead of the one in Conn which has just
%% died.
%% Returns : ok
%%--------------------------------------------------------------------
start_reconnect(Conn, LogFun) when is_record(Conn, mysql_connection) ->
Pid = spawn(fun () ->
reconnect_loop(Conn#mysql_connection{conn_pid = undefined}, LogFun, 0)
end),
{Id, Host, Port} = {Conn#mysql_connection.id, Conn#mysql_connection.host, Conn#mysql_connection.port},
log(LogFun, debug, "mysql: Started pid ~p to try and reconnect to ~p:~s:~p (replacing "
"connection with pid ~p)", [Pid, Id, Host, Port, Conn#mysql_connection.conn_pid]),
ok.
%%--------------------------------------------------------------------
%% Function: reconnect_loop(Conn, LogFun, 0)
%% Conn = mysql_connection record()
%% LogFun = undefined | function() with arity 3
%% Descrip.: Loop indefinately until we are able to reconnect to the
%% server specified in the now dead connection Conn.
%% Returns : ok
%%--------------------------------------------------------------------
reconnect_loop(Conn, LogFun, N) when is_record(Conn, mysql_connection) ->
{Id, Host, Port} = {Conn#mysql_connection.id, Conn#mysql_connection.host, Conn#mysql_connection.port},
case connect(Id,
Host,
Port,
Conn#mysql_connection.user,
Conn#mysql_connection.password,
Conn#mysql_connection.database,
Conn#mysql_connection.reconnect) of
{ok, ConnPid} ->
log(LogFun, debug, "mysql_reconnect: Managed to reconnect to ~p:~s:~p (connection pid ~p)",
[Id, Host, Port, ConnPid]),
ok;
{error, Reason} ->
%% log every once in a while
NewN = case N of
10 ->
log(LogFun, debug, "mysql_reconnect: Still unable to connect to ~p:~s:~p (~p)",
[Id, Host, Port, Reason]),
0;
_ ->
N + 1
end,
%% sleep between every unsuccessfull attempt
timer:sleep(20 * 1000),
reconnect_loop(Conn, LogFun, NewN)
end.

6
src/mysql/mysql.hrl Normal file
View File

@ -0,0 +1,6 @@
%% MySQL result record:
-record(mysql_result,
{fieldinfo=[],
rows=[],
affectedrows=0,
error=""}).

192
src/mysql/mysql_auth.erl Normal file
View File

@ -0,0 +1,192 @@
%%%-------------------------------------------------------------------
%%% File : mysql_auth.erl
%%% Author : Fredrik Thulin <ft@it.su.se>
%%% Descrip.: MySQL client authentication functions.
%%% Created : 4 Aug 2005 by Fredrik Thulin <ft@it.su.se>
%%%
%%% Note : All MySQL code was written by Magnus Ahltorp, originally
%%% in the file mysql.erl - I just moved it here.
%%%
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
%%% See the file COPYING
%%%
%%%-------------------------------------------------------------------
-module(mysql_auth).
%%--------------------------------------------------------------------
%% External exports (should only be used by the 'mysql_conn' module)
%%--------------------------------------------------------------------
-export([
do_old_auth/7,
do_new_auth/8
]).
%%--------------------------------------------------------------------
%% Macros
%%--------------------------------------------------------------------
-define(LONG_PASSWORD, 1).
-define(FOUND_ROWS, 2).
-define(LONG_FLAG, 4).
-define(PROTOCOL_41, 512).
-define(TRANSACTIONS, 8192).
-define(SECURE_CONNECTION, 32768).
-define(CONNECT_WITH_DB, 8).
-define(MAX_PACKET_SIZE, 1000000).
%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: do_old_auth(Sock, RecvPid, SeqNum, User, Password, Salt1,
%% LogFun)
%% Sock = term(), gen_tcp socket
%% RecvPid = pid(), receiver process pid
%% SeqNum = integer(), first sequence number we should use
%% User = string(), MySQL username
%% Password = string(), MySQL password
%% Salt1 = string(), salt 1 from server greeting
%% LogFun = undefined | function() of arity 3
%% Descrip.: Perform old-style MySQL authentication.
%% Returns : result of mysql_conn:do_recv/3
%%--------------------------------------------------------------------
do_old_auth(Sock, RecvPid, SeqNum, User, Password, Salt1, LogFun) ->
Auth = password_old(Password, Salt1),
Packet2 = make_auth(User, Auth),
do_send(Sock, Packet2, SeqNum, LogFun),
mysql_conn:do_recv(LogFun, RecvPid, SeqNum).
%%--------------------------------------------------------------------
%% Function: do_new_auth(Sock, RecvPid, SeqNum, User, Password, Salt1,
%% Salt2, LogFun)
%% Sock = term(), gen_tcp socket
%% RecvPid = pid(), receiver process pid
%% SeqNum = integer(), first sequence number we should use
%% User = string(), MySQL username
%% Password = string(), MySQL password
%% Salt1 = string(), salt 1 from server greeting
%% Salt2 = string(), salt 2 from server greeting
%% LogFun = undefined | function() of arity 3
%% Descrip.: Perform MySQL authentication.
%% Returns : result of mysql_conn:do_recv/3
%%--------------------------------------------------------------------
do_new_auth(Sock, RecvPid, SeqNum, User, Password, Salt1, Salt2, LogFun) ->
Auth = password_new(Password, Salt1 ++ Salt2),
Packet2 = make_new_auth(User, Auth, none),
do_send(Sock, Packet2, SeqNum, LogFun),
case mysql_conn:do_recv(LogFun, RecvPid, SeqNum) of
{ok, Packet3, SeqNum2} ->
case Packet3 of
<<254:8>> ->
AuthOld = password_old(Password, Salt1),
do_send(Sock, <<AuthOld/binary, 0:8>>, SeqNum2 + 1, LogFun),
mysql_conn:do_recv(LogFun, RecvPid, SeqNum2 + 1);
_ ->
{ok, Packet3, SeqNum2}
end;
{error, Reason} ->
{error, Reason}
end.
%%====================================================================
%% Internal functions
%%====================================================================
password_old(Password, Salt) ->
{P1, P2} = hash(Password),
{S1, S2} = hash(Salt),
Seed1 = P1 bxor S1,
Seed2 = P2 bxor S2,
List = rnd(9, Seed1, Seed2),
{L, [Extra]} = lists:split(8, List),
list_to_binary(lists:map(fun (E) ->
E bxor (Extra - 64)
end, L)).
%% part of do_old_auth/4, which is part of mysql_init/4
make_auth(User, Password) ->
Caps = ?LONG_PASSWORD bor ?LONG_FLAG
bor ?TRANSACTIONS bor ?FOUND_ROWS,
Maxsize = 0,
UserB = list_to_binary(User),
PasswordB = Password,
<<Caps:16/little, Maxsize:24/little, UserB/binary, 0:8,
PasswordB/binary>>.
%% part of do_new_auth/4, which is part of mysql_init/4
make_new_auth(User, Password, Database) ->
DBCaps = case Database of
none ->
0;
_ ->
?CONNECT_WITH_DB
end,
Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS bor
?PROTOCOL_41 bor ?SECURE_CONNECTION bor DBCaps
bor ?FOUND_ROWS,
Maxsize = ?MAX_PACKET_SIZE,
UserB = list_to_binary(User),
PasswordL = size(Password),
DatabaseB = case Database of
none ->
<<>>;
_ ->
list_to_binary(Database)
end,
<<Caps:32/little, Maxsize:32/little, 8:8, 0:23/integer-unit:8,
UserB/binary, 0:8, PasswordL:8, Password/binary, DatabaseB/binary>>.
hash(S) ->
hash(S, 1345345333, 305419889, 7).
hash([C | S], N1, N2, Add) ->
N1_1 = N1 bxor (((N1 band 63) + Add) * C + N1 * 256),
N2_1 = N2 + ((N2 * 256) bxor N1_1),
Add_1 = Add + C,
hash(S, N1_1, N2_1, Add_1);
hash([], N1, N2, _Add) ->
Mask = (1 bsl 31) - 1,
{N1 band Mask , N2 band Mask}.
rnd(N, Seed1, Seed2) ->
Mod = (1 bsl 30) - 1,
rnd(N, [], Seed1 rem Mod, Seed2 rem Mod).
rnd(0, List, _, _) ->
lists:reverse(List);
rnd(N, List, Seed1, Seed2) ->
Mod = (1 bsl 30) - 1,
NSeed1 = (Seed1 * 3 + Seed2) rem Mod,
NSeed2 = (NSeed1 + Seed2 + 33) rem Mod,
Float = (float(NSeed1) / float(Mod))*31,
Val = trunc(Float)+64,
rnd(N - 1, [Val | List], NSeed1, NSeed2).
dualmap(_F, [], []) ->
[];
dualmap(F, [E1 | R1], [E2 | R2]) ->
[F(E1, E2) | dualmap(F, R1, R2)].
bxor_binary(B1, B2) ->
list_to_binary(dualmap(fun (E1, E2) ->
E1 bxor E2
end, binary_to_list(B1), binary_to_list(B2))).
password_new(Password, Salt) ->
Stage1 = crypto:sha(Password),
Stage2 = crypto:sha(Stage1),
Res = crypto:sha_final(
crypto:sha_update(
crypto:sha_update(crypto:sha_init(), Salt),
Stage2)
),
bxor_binary(Res, Stage1).
do_send(Sock, Packet, Num, LogFun) ->
mysql:log(LogFun, debug, "mysql_auth send packet ~p: ~p", [Num, Packet]),
Data = <<(size(Packet)):24/little, Num:8, Packet/binary>>,
gen_tcp:send(Sock, Data).

759
src/mysql/mysql_conn.erl Normal file
View File

@ -0,0 +1,759 @@
%%%-------------------------------------------------------------------
%%% File : mysql_conn.erl
%%% Author : Fredrik Thulin <ft@it.su.se>
%%% Descrip.: MySQL connection handler, handles de-framing of messages
%%% received by the MySQL receiver process.
%%% Created : 5 Aug 2005 by Fredrik Thulin <ft@it.su.se>
%%% Modified: 11 Jan 2006 by Mickael Remond <mickael.remond@process-one.net>
%%%
%%% Note : All MySQL code was written by Magnus Ahltorp, originally
%%% in the file mysql.erl - I just moved it here.
%%%
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
%%% See the file COPYING
%%%
%%%
%%% This module handles a single connection to a single MySQL server.
%%% You can use it stand-alone, or through the 'mysql' module if you
%%% want to have more than one connection to the server, or
%%% connections to different servers.
%%%
%%% To use it stand-alone, set up the connection with
%%%
%%% {ok, Pid} = mysql_conn:start(Host, Port, User, Password,
%%% Database, LogFun)
%%%
%%% Host = string()
%%% Port = integer()
%%% User = string()
%%% Password = string()
%%% Database = string()
%%% LogFun = undefined | (gives logging to console)
%%% function() of arity 3 (Level, Fmt, Args)
%%%
%%% Note: In stand-alone mode you have to start Erlang crypto application by
%%% yourself with crypto:start()
%%%
%%% and then make MySQL querys with
%%%
%%% Result = mysql_conn:fetch(Pid, Query, self())
%%%
%%% Result = {data, MySQLRes} |
%%% {updated, MySQLRes} |
%%% {error, MySQLRes}
%%% Where: MySQLRes = #mysql_result
%%%
%%% Actual data can be extracted from MySQLRes by calling the following API
%%% functions:
%%% - on data received:
%%% FieldInfo = mysql:get_result_field_info(MysqlRes)
%%% AllRows = mysql:get_result_rows(MysqlRes)
%%% with FieldInfo = list() of {Table, Field, Length, Name}
%%% and AllRows = list() of list() representing records
%%% - on update:
%%% Affected= mysql:get_result_affected_rows(MysqlRes)
%%% with Affected = integer()
%%% - on error:
%%% Reason = mysql:get_result_reason(MysqlRes)
%%% with Reason = string()
%%%-------------------------------------------------------------------
-module(mysql_conn).
%%--------------------------------------------------------------------
%% External exports
%%--------------------------------------------------------------------
-export([start/6,
start_link/6,
fetch/3,
fetch/4,
squery/4,
stop/1
]).
%%--------------------------------------------------------------------
%% External exports (should only be used by the 'mysql_auth' module)
%%--------------------------------------------------------------------
-export([do_recv/3
]).
-include("mysql.hrl").
-record(state, {
mysql_version,
log_fun,
recv_pid,
socket,
data
}).
-define(SECURE_CONNECTION, 32768).
-define(MYSQL_QUERY_OP, 3).
-define(DEFAULT_STANDALONE_TIMEOUT, 5000).
-define(DEFAULT_RESULT_TYPE, list).
-define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x
-define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x
%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start(Host, Port, User, Password, Database, LogFun)
%% Function: start_link(Host, Port, User, Password, Database, LogFun)
%% Host = string()
%% Port = integer()
%% User = string()
%% Password = string()
%% Database = string()
%% LogFun = undefined | function() of arity 3
%% Descrip.: Starts a mysql_conn process that connects to a MySQL
%% server, logs in and chooses a database.
%% Returns : {ok, Pid} | {error, Reason}
%% Pid = pid()
%% Reason = string()
%%--------------------------------------------------------------------
start(Host, Port, User, Password,
Database, LogFun) when is_list(Host),
is_integer(Port),
is_list(User),
is_list(Password),
is_list(Database) ->
ConnPid = self(),
Pid = spawn(fun () ->
init(Host, Port, User, Password, Database,
LogFun, ConnPid)
end),
post_start(Pid, LogFun).
start_link(Host, Port, User, Password,
Database, LogFun) when is_list(Host),
is_integer(Port),
is_list(User),
is_list(Password),
is_list(Database) ->
ConnPid = self(),
Pid = spawn_link(fun () ->
init(Host, Port, User, Password, Database,
LogFun, ConnPid)
end),
post_start(Pid, LogFun).
%% part of start/6 or start_link/6:
post_start(Pid, _LogFun) ->
%%Timeout = get_option(timeout, Options, ?DEFAULT_STANDALONE_TIMEOUT),
%%TODO find a way to get configured Options here
Timeout= ?DEFAULT_STANDALONE_TIMEOUT,
receive
{mysql_conn, Pid, ok} ->
{ok, Pid};
{mysql_conn, Pid, {error, Reason}} ->
mysql:log(_LogFun, error, "mysql_conn: post_start error ~p~n",
[Reason]),
stop(Pid),
{error, Reason}
% Unknown ->
% mysql:log(_LogFun, error, "mysql_conn: Received unknown signal, exiting"),
% mysql:log(_LogFun, debug, "mysql_conn: Unknown signal : ~p", [Unknown]),
% {error, "unknown signal received"}
after Timeout ->
mysql:log(_LogFun, error, "mysql_conn: post_start timeout~n",
[]),
stop(Pid),
{error, "timed out"}
end.
%%--------------------------------------------------------------------
%% Function: fetch(Pid, Query, From)
%% fetch(Pid, Query, From, Timeout)
%% Pid = pid(), mysql_conn to send fetch-request to
%% Query = string(), MySQL query in verbatim
%% From = pid() or term(), use a From of self() when
%% using this module for a single connection,
%% or pass the gen_server:call/3 From argument if
%% using a gen_server to do the querys (e.g. the
%% mysql_dispatcher)
%% Timeout = integer() | infinity, gen_server timeout value
%% Descrip.: Send a query and wait for the result if running stand-
%% alone (From = self()), but don't block the caller if we
%% are not running stand-alone (From = gen_server From).
%% Returns : ok | (non-stand-alone mode)
%% {data, #mysql_result} | (stand-alone mode)
%% {updated, #mysql_result} | (stand-alone mode)
%% {error, #mysql_result} (stand-alone mode)
%% FieldInfo = term()
%% Rows = list() of [string()]
%% Reason = term()
%%--------------------------------------------------------------------
fetch(Pid, Query, From) ->
squery(Pid, Query, From, []).
fetch(Pid, Query, From, Timeout) ->
squery(Pid, Query, From, [{timeout, Timeout}]).
squery(Pid, Query, From, Options) when is_pid(Pid), is_list(Query) ->
Self = self(),
Timeout = get_option(timeout, Options, ?DEFAULT_STANDALONE_TIMEOUT),
TRef = erlang:start_timer(Timeout, self(), timeout),
Pid ! {fetch, TRef, Query, From, Options},
case From of
Self ->
%% We are not using a mysql_dispatcher, await the response
wait_fetch_result(TRef, Pid);
_ ->
%% From is gen_server From, Pid will do gen_server:reply()
%% when it has an answer
ok
end.
wait_fetch_result(TRef, Pid) ->
receive
{fetch_result, TRef, Pid, Result} ->
case erlang:cancel_timer(TRef) of
false ->
receive
{timeout, TRef, _} ->
ok
after 0 ->
ok
end;
_ ->
ok
end,
Result;
{fetch_result, _BadRef, Pid, _Result} ->
wait_fetch_result(TRef, Pid);
{timeout, TRef, _Info} ->
stop(Pid),
{error, "query timed out"}
end.
stop(Pid) ->
Pid ! close.
%%--------------------------------------------------------------------
%% Function: do_recv(LogFun, RecvPid, SeqNum)
%% LogFun = undefined | function() with arity 3
%% RecvPid = pid(), mysql_recv process
%% SeqNum = undefined | integer()
%% Descrip.: Wait for a frame decoded and sent to us by RecvPid.
%% Either wait for a specific frame if SeqNum is an integer,
%% or just any frame if SeqNum is undefined.
%% Returns : {ok, Packet, Num} |
%% {error, Reason}
%% Reason = term()
%%
%% Note : Only to be used externally by the 'mysql_auth' module.
%%--------------------------------------------------------------------
do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun);
LogFun == undefined,
SeqNum == undefined ->
receive
{mysql_recv, RecvPid, data, Packet, Num} ->
%%mysql:log(LogFun, debug, "mysql_conn: recv packet ~p:
%%~p", [Num, Packet]),
{ok, Packet, Num};
{mysql_recv, RecvPid, closed, _E} ->
mysql:log(LogFun, error, "mysql_conn: mysql_recv:"
" socket was closed ~p~n", [{RecvPid, _E}]),
{error, "mysql_recv: socket was closed"}
end;
do_recv(LogFun, RecvPid, SeqNum) when is_function(LogFun);
LogFun == undefined,
is_integer(SeqNum) ->
ResponseNum = SeqNum + 1,
receive
{mysql_recv, RecvPid, data, Packet, ResponseNum} ->
%%mysql:log(LogFun, debug, "mysql_conn: recv packet ~p:
%%~p", [ResponseNum, Packet]),
{ok, Packet, ResponseNum};
{mysql_recv, RecvPid, closed, _E} ->
mysql:log(LogFun, error, "mysql_conn: mysql_recv:"
" socket was closed 2 ~p~n", [{RecvPid, _E}]),
{error, "mysql_recv: socket was closed"}
end.
%%====================================================================
%% Internal functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Host, Port, User, Password, Database, LogFun,
%% Parent)
%% Host = string()
%% Port = integer()
%% User = string()
%% Password = string()
%% Database = string()
%% LogFun = undefined | function() of arity 3
%% Parent = pid() of process starting this mysql_conn
%% Descrip.: Connect to a MySQL server, log in and chooses a database.
%% Report result of this to Parent, and then enter loop() if
%% we were successfull.
%% Returns : void() | does not return
%%--------------------------------------------------------------------
init(Host, Port, User, Password, Database, LogFun, Parent) ->
case mysql_recv:start_link(Host, Port, LogFun, self()) of
{ok, RecvPid, Sock} ->
case mysql_init(Sock, RecvPid, User, Password, LogFun) of
{ok, Version} ->
case do_query(Sock, RecvPid, LogFun, "use " ++ Database,
Version, [{result_type, binary}]) of
{error, MySQLRes} ->
mysql:log(LogFun, error,
"mysql_conn: Failed changing"
" to database ~p : ~p",
[Database,
mysql:get_result_reason(MySQLRes)]),
gen_tcp:close(Sock),
Parent ! {mysql_conn, self(),
{error, failed_changing_database}};
%% ResultType: data | updated
{_ResultType, _MySQLRes} ->
Parent ! {mysql_conn, self(), ok},
State = #state{mysql_version=Version,
recv_pid = RecvPid,
socket = Sock,
log_fun = LogFun,
data = <<>>
},
loop(State)
end;
{error, _Reason} ->
Parent ! {mysql_conn, self(), {error, login_failed}}
end;
E ->
mysql:log(LogFun, error, "mysql_conn: "
"Failed connecting to ~p:~p : ~p",
[Host, Port, E]),
Parent ! {mysql_conn, self(), {error, connect_failed}}
end.
%%--------------------------------------------------------------------
%% Function: loop(State)
%% State = state record()
%% Descrip.: Wait for signals asking us to perform a MySQL query, or
%% signals that the socket was closed.
%% Returns : error | does not return
%%--------------------------------------------------------------------
loop(State) ->
RecvPid = State#state.recv_pid,
receive
{fetch, Ref, Query, GenSrvFrom, Options} ->
%% GenSrvFrom is either a gen_server:call/3 From term(),
%% or a pid if no gen_server was used to make the query
Res = do_query(State, Query, Options),
case is_pid(GenSrvFrom) of
true ->
%% The query was not sent using gen_server mechanisms
GenSrvFrom ! {fetch_result, Ref, self(), Res};
false ->
%% the timer is canceled in wait_fetch_result/2, but we wait on that funtion only if the query
%% was not sent using the mysql gen_server. So we at least should try to cancel the timer here
%% (no warranty, the gen_server can still receive timeout messages)
erlang:cancel_timer(Ref),
gen_server:reply(GenSrvFrom, Res)
end,
loop(State);
{mysql_recv, RecvPid, data, Packet, Num} ->
mysql:log(State#state.log_fun, error, "mysql_conn: "
"Received MySQL data when not expecting any "
"(num ~p) - ignoring it", [Num]),
mysql:log(State#state.log_fun, error, "mysql_conn: "
"Unexpected MySQL data (num ~p) :~n~p",
[Num, Packet]),
loop(State);
close ->
mysql:log(State#state.log_fun, error, "mysql_conn: "
"Received close signal, exiting.", []),
close_connection(State);
Unknown ->
mysql:log(State#state.log_fun, error, "mysql_conn: "
"Received unknown signal, exiting : ~p",
[Unknown]),
close_connection(State),
error
end.
%%--------------------------------------------------------------------
%% Function: mysql_init(Sock, RecvPid, User, Password, LogFun)
%% Sock = term(), gen_tcp socket
%% RecvPid = pid(), mysql_recv process
%% User = string()
%% Password = string()
%% LogFun = undefined | function() with arity 3
%% Descrip.: Try to authenticate on our new socket.
%% Returns : ok | {error, Reason}
%% Reason = string()
%%--------------------------------------------------------------------
mysql_init(Sock, RecvPid, User, Password, LogFun) ->
case do_recv(LogFun, RecvPid, undefined) of
{ok, Packet, InitSeqNum} ->
{Version, Salt1, Salt2, Caps} = greeting(Packet, LogFun),
AuthRes =
case Caps band ?SECURE_CONNECTION of
?SECURE_CONNECTION ->
mysql_auth:do_new_auth(Sock, RecvPid,
InitSeqNum + 1,
User, Password,
Salt1, Salt2, LogFun);
_ ->
mysql_auth:do_old_auth(Sock, RecvPid,
InitSeqNum + 1,
User, Password,
Salt1, LogFun)
end,
case AuthRes of
{ok, <<0:8, _Rest/binary>>, _RecvNum} ->
{ok,Version};
{ok, <<255:8, Code:16/little, Message/binary>>, _RecvNum} ->
mysql:log(LogFun, error, "mysql_conn: "
"init error ~p: ~p~n",
[Code, binary_to_list(Message)]),
{error, binary_to_list(Message)};
{ok, RecvPacket, _RecvNum} ->
mysql:log(LogFun, error, "mysql_conn: "
"init unknown error ~p~n",
[binary_to_list(RecvPacket)]),
{error, binary_to_list(RecvPacket)};
{error, Reason} ->
mysql:log(LogFun, error, "mysql_conn: "
"init failed receiving data : ~p~n",
[Reason]),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% part of mysql_init/4
greeting(Packet, LogFun) ->
<<Protocol:8, Rest/binary>> = Packet,
{Version, Rest2} = asciz(Rest),
<<_TreadID:32/little, Rest3/binary>> = Rest2,
{Salt, Rest4} = asciz(Rest3),
<<Caps:16/little, Rest5/binary>> = Rest4,
<<ServerChar:16/binary-unit:8, Rest6/binary>> = Rest5,
{Salt2, _Rest7} = asciz(Rest6),
mysql:log(LogFun, debug, "mysql_conn: greeting version ~p (protocol ~p) "
"salt ~p caps ~p serverchar ~p salt2 ~p",
[Version, Protocol, Salt, Caps, ServerChar, Salt2]),
{normalize_version(Version, LogFun), Salt, Salt2, Caps}.
%% part of greeting/2
asciz(Data) when is_binary(Data) ->
mysql:asciz_binary(Data, []);
asciz(Data) when is_list(Data) ->
{String, [0 | Rest]} = lists:splitwith(fun (C) ->
C /= 0
end, Data),
{String, Rest}.
%%--------------------------------------------------------------------
%% Function: get_query_response(LogFun, RecvPid)
%% LogFun = undefined | function() with arity 3
%% RecvPid = pid(), mysql_recv process
%% Version = integer(), Representing MySQL version used
%% Descrip.: Wait for frames until we have a complete query response.
%% Returns : {data, #mysql_result}
%% {updated, #mysql_result}
%% {error, #mysql_result}
%% FieldInfo = list() of term()
%% Rows = list() of [string()]
%% AffectedRows = int()
%% Reason = term()
%%--------------------------------------------------------------------
get_query_response(LogFun, RecvPid, Version, Options) ->
case do_recv(LogFun, RecvPid, undefined) of
{ok, <<Fieldcount:8, Rest/binary>>, _} ->
case Fieldcount of
0 ->
%% No Tabular data
<<AffectedRows:8, _Rest2/binary>> = Rest,
{updated, #mysql_result{affectedrows=AffectedRows}};
255 ->
<<_Code:16/little, Message/binary>> = Rest,
{error, #mysql_result{error=binary_to_list(Message)}};
_ ->
%% Tabular data received
ResultType = get_option(result_type, Options, ?DEFAULT_RESULT_TYPE),
case get_fields(LogFun, RecvPid, [], Version, ResultType) of
{ok, Fields} ->
case get_rows(Fieldcount, LogFun, RecvPid, ResultType, []) of
{ok, Rows} ->
{data, #mysql_result{fieldinfo=Fields,
rows=Rows}};
{error, Reason} ->
{error, #mysql_result{error=Reason}}
end;
{error, Reason} ->
{error, #mysql_result{error=Reason}}
end
end;
{error, Reason} ->
{error, #mysql_result{error=Reason}}
end.
%%--------------------------------------------------------------------
%% Function: get_fields(LogFun, RecvPid, [], Version)
%% LogFun = undefined | function() with arity 3
%% RecvPid = pid(), mysql_recv process
%% Version = integer(), Representing MySQL version used
%% Descrip.: Received and decode field information.
%% Returns : {ok, FieldInfo} |
%% {error, Reason}
%% FieldInfo = list() of term()
%% Reason = term()
%%--------------------------------------------------------------------
%% Support for MySQL 4.0.x:
get_fields(LogFun, RecvPid, Res, ?MYSQL_4_0, ResultType) ->
case do_recv(LogFun, RecvPid, undefined) of
{ok, Packet, _Num} ->
case Packet of
<<254:8>> ->
{ok, lists:reverse(Res)};
<<254:8, Rest/binary>> when size(Rest) < 8 ->
{ok, lists:reverse(Res)};
_ ->
{Table, Rest} = get_with_length(Packet),
{Field, Rest2} = get_with_length(Rest),
{LengthB, Rest3} = get_with_length(Rest2),
LengthL = size(LengthB) * 8,
<<Length:LengthL/little>> = LengthB,
{Type, Rest4} = get_with_length(Rest3),
{_Flags, _Rest5} = get_with_length(Rest4),
if ResultType == list ->
This = {binary_to_list(Table),
binary_to_list(Field),
Length,
%% TODO: Check on MySQL 4.0 if types are specified
%% using the same 4.1 formalism and could
%% be expanded to atoms:
binary_to_list(Type)};
ResultType == binary ->
This = {Table, Field, Length, Type}
end,
get_fields(LogFun, RecvPid, [This | Res],
?MYSQL_4_0, ResultType)
end;
{error, Reason} ->
{error, Reason}
end;
%% Support for MySQL 4.1.x and 5.x:
get_fields(LogFun, RecvPid, Res, ?MYSQL_4_1, ResultType) ->
case do_recv(LogFun, RecvPid, undefined) of
{ok, Packet, _Num} ->
case Packet of
<<254:8>> ->
{ok, lists:reverse(Res)};
<<254:8, Rest/binary>> when size(Rest) < 8 ->
{ok, lists:reverse(Res)};
_ ->
{_Catalog, Rest} = get_with_length(Packet),
{_Database, Rest2} = get_with_length(Rest),
{Table, Rest3} = get_with_length(Rest2),
%% OrgTable is the real table name if Table is an alias
{_OrgTable, Rest4} = get_with_length(Rest3),
{Field, Rest5} = get_with_length(Rest4),
%% OrgField is the real field name if Field is an alias
{_OrgField, Rest6} = get_with_length(Rest5),
<<_Metadata:8/little, _Charset:16/little,
Length:32/little, Type:8/little,
_Flags:16/little, _Decimals:8/little,
_Rest7/binary>> = Rest6,
if ResultType == list ->
This = {binary_to_list(Table),
binary_to_list(Field),
Length,
get_field_datatype(Type)};
ResultType == binary ->
This = {Table, Field, Length,
get_field_datatype(Type)}
end,
get_fields(LogFun, RecvPid, [This | Res],
?MYSQL_4_1, ResultType)
end;
{error, Reason} ->
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Function: get_rows(N, LogFun, RecvPid, [])
%% N = integer(), number of rows to get
%% LogFun = undefined | function() with arity 3
%% RecvPid = pid(), mysql_recv process
%% Descrip.: Receive and decode a number of rows.
%% Returns : {ok, Rows} |
%% {error, Reason}
%% Rows = list() of [string()]
%%--------------------------------------------------------------------
get_rows(N, LogFun, RecvPid, ResultType, Res) ->
case do_recv(LogFun, RecvPid, undefined) of
{ok, Packet, _Num} ->
case Packet of
<<254:8, Rest/binary>> when size(Rest) < 8 ->
{ok, lists:reverse(Res)};
_ ->
{ok, This} = get_row(N, Packet, ResultType, []),
get_rows(N, LogFun, RecvPid, ResultType, [This | Res])
end;
{error, Reason} ->
{error, Reason}
end.
%% part of get_rows/4
get_row(0, _Data, _ResultType, Res) ->
{ok, lists:reverse(Res)};
get_row(N, Data, ResultType, Res) ->
{Col, Rest} = get_with_length(Data),
This = case Col of
null ->
null;
_ ->
if
ResultType == list ->
binary_to_list(Col);
ResultType == binary ->
Col
end
end,
get_row(N - 1, Rest, ResultType, [This | Res]).
get_with_length(<<251:8, Rest/binary>>) ->
{null, Rest};
get_with_length(<<252:8, Length:16/little, Rest/binary>>) ->
split_binary(Rest, Length);
get_with_length(<<253:8, Length:24/little, Rest/binary>>) ->
split_binary(Rest, Length);
get_with_length(<<254:8, Length:64/little, Rest/binary>>) ->
split_binary(Rest, Length);
get_with_length(<<Length:8, Rest/binary>>) when Length < 251 ->
split_binary(Rest, Length).
close_connection(State) ->
Result = gen_tcp:close(State#state.socket),
mysql:log(State#state.log_fun, normal, "Closing connection ~p: ~p~n",
[State#state.socket, Result]),
Result.
%%--------------------------------------------------------------------
%% Function: do_query(State, Query)
%% do_query(Sock, RecvPid, LogFun, Query)
%% Sock = term(), gen_tcp socket
%% RecvPid = pid(), mysql_recv process
%% LogFun = undefined | function() with arity 3
%% Query = string()
%% Descrip.: Send a MySQL query and block awaiting it's response.
%% Returns : result of get_query_response/2 | {error, Reason}
%%--------------------------------------------------------------------
do_query(State, Query, Options) when is_record(State, state) ->
do_query(State#state.socket,
State#state.recv_pid,
State#state.log_fun,
Query,
State#state.mysql_version,
Options
).
do_query(Sock, RecvPid, LogFun, Query, Version, Options) when is_pid(RecvPid),
is_list(Query) ->
Packet = list_to_binary([?MYSQL_QUERY_OP, Query]),
case do_send(Sock, Packet, 0, LogFun) of
ok ->
get_query_response(LogFun, RecvPid, Version, Options);
{error, Reason} ->
Msg = io_lib:format("Failed sending data on socket : ~p", [Reason]),
{error, Msg}
end.
%%--------------------------------------------------------------------
%% Function: do_send(Sock, Packet, SeqNum, LogFun)
%% Sock = term(), gen_tcp socket
%% Packet = binary()
%% SeqNum = integer(), packet sequence number
%% LogFun = undefined | function() with arity 3
%% Descrip.: Send a packet to the MySQL server.
%% Returns : result of gen_tcp:send/2
%%--------------------------------------------------------------------
do_send(Sock, Packet, SeqNum, _LogFun) when is_binary(Packet),
is_integer(SeqNum) ->
Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>,
%%mysql:log(LogFun, debug, "mysql_conn: send packet ~p: ~p",
%%[SeqNum, Data]),
gen_tcp:send(Sock, Data).
%%--------------------------------------------------------------------
%% Function: normalize_version(Version, LogFun)
%% Version = string()
%% LogFun = undefined | function() with arity 3
%% Descrip.: Return a flag corresponding to the MySQL version used.
%% The protocol used depends on this flag.
%% Returns : Version = string()
%%--------------------------------------------------------------------
normalize_version([$4,$.,$0|_T], LogFun) ->
mysql:log(LogFun, debug, "Switching to MySQL 4.0.x protocol.~n"),
?MYSQL_4_0;
normalize_version([$4,$.,$1|_T], _LogFun) ->
?MYSQL_4_1;
normalize_version([$5|_T], _LogFun) ->
%% MySQL version 5.x protocol is compliant with MySQL 4.1.x:
?MYSQL_4_1;
normalize_version(_Other, LogFun) ->
mysql:log(LogFun, error, "MySQL version not supported: MySQL Erlang "
"module might not work correctly.~n"),
%% Error, but trying the oldest protocol anyway:
?MYSQL_4_0.
%%--------------------------------------------------------------------
%% Function: get_field_datatype(DataType)
%% DataType = integer(), MySQL datatype
%% Descrip.: Return MySQL field datatype as description string
%% Returns : String, MySQL datatype
%%--------------------------------------------------------------------
get_field_datatype(0) -> 'DECIMAL';
get_field_datatype(1) -> 'TINY';
get_field_datatype(2) -> 'SHORT';
get_field_datatype(3) -> 'LONG';
get_field_datatype(4) -> 'FLOAT';
get_field_datatype(5) -> 'DOUBLE';
get_field_datatype(6) -> 'NULL';
get_field_datatype(7) -> 'TIMESTAMP';
get_field_datatype(8) -> 'LONGLONG';
get_field_datatype(9) -> 'INT24';
get_field_datatype(10) -> 'DATE';
get_field_datatype(11) -> 'TIME';
get_field_datatype(12) -> 'DATETIME';
get_field_datatype(13) -> 'YEAR';
get_field_datatype(14) -> 'NEWDATE';
get_field_datatype(16) -> 'BIT';
get_field_datatype(246) -> 'DECIMAL';
get_field_datatype(247) -> 'ENUM';
get_field_datatype(248) -> 'SET';
get_field_datatype(249) -> 'TINYBLOB';
get_field_datatype(250) -> 'MEDIUM_BLOG';
get_field_datatype(251) -> 'LONG_BLOG';
get_field_datatype(252) -> 'BLOB';
get_field_datatype(253) -> 'VAR_STRING';
get_field_datatype(254) -> 'STRING';
get_field_datatype(255) -> 'GEOMETRY'.
%%--------------------------------------------------------------------
%% Function: get_option(Key1, Options, Default) -> Value1
%% Options = [Option]
%% Option = {Key2, Value2}
%% Key1 = Key2 = atom()
%% Value1 = Value2 = Default = term()
%% Descrip.: Return the option associated with Key passed to squery/4
%%--------------------------------------------------------------------
get_option(Key, Options, Default) ->
case lists:keysearch(Key, 1, Options) of
{value, {_, Value}} ->
Value;
false ->
Default
end.

165
src/mysql/mysql_recv.erl Normal file
View File

@ -0,0 +1,165 @@
%%%-------------------------------------------------------------------
%%% File : mysql_recv.erl
%%% Author : Fredrik Thulin <ft@it.su.se>
%%% Descrip.: Handles data being received on a MySQL socket. Decodes
%%% per-row framing and sends each row to parent.
%%%
%%% Created : 4 Aug 2005 by Fredrik Thulin <ft@it.su.se>
%%%
%%% Note : All MySQL code was written by Magnus Ahltorp, originally
%%% in the file mysql.erl - I just moved it here.
%%%
%%% Copyright (c) 2001-2004 Kungliga Tekniska Högskolan
%%% See the file COPYING
%%%
%%% Signals this receiver process can send to it's parent
%%% (the parent is a mysql_conn connection handler) :
%%%
%%% {mysql_recv, self(), data, Packet, Num}
%%% {mysql_recv, self(), closed, {error, Reason}}
%%% {mysql_recv, self(), closed, normal}
%%%
%%% Internally (from inside init/4 to start_link/4) the
%%% following signals may be sent to the parent process :
%%%
%%% {mysql_recv, self(), init, {ok, Sock}}
%%% {mysql_recv, self(), init, {error, E}}
%%%
%%%-------------------------------------------------------------------
-module(mysql_recv).
%%--------------------------------------------------------------------
%% External exports (should only be used by the 'mysql_conn' module)
%%--------------------------------------------------------------------
-export([start_link/4
]).
-record(state, {
socket,
parent,
log_fun,
data
}).
-define(SECURE_CONNECTION, 32768).
-define(CONNECT_TIMEOUT, 5000).
%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link(Host, Port, LogFun, Parent)
%% Host = string()
%% Port = integer()
%% LogFun = undefined | function() of arity 3
%% Parent = pid(), process that should get received frames
%% Descrip.: Start a process that connects to Host:Port and waits for
%% data. When it has received a MySQL frame, it sends it to
%% Parent and waits for the next frame.
%% Returns : {ok, RecvPid, Socket} |
%% {error, Reason}
%% RecvPid = pid(), receiver process pid
%% Socket = term(), gen_tcp socket
%% Reason = atom() | string()
%%--------------------------------------------------------------------
start_link(Host, Port, LogFun, Parent) when is_list(Host), is_integer(Port) ->
RecvPid =
spawn_link(fun () ->
init(Host, Port, LogFun, Parent)
end),
%% wait for the socket from the spawned pid
receive
{mysql_recv, RecvPid, init, {error, E}} ->
{error, E};
{mysql_recv, RecvPid, init, {ok, Socket}} ->
{ok, RecvPid, Socket}
after ?CONNECT_TIMEOUT ->
catch exit(RecvPid, kill),
{error, "timeout"}
end.
%%====================================================================
%% Internal functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init((Host, Port, LogFun, Parent)
%% Host = string()
%% Port = integer()
%% LogFun = undefined | function() of arity 3
%% Parent = pid(), process that should get received frames
%% Descrip.: Connect to Host:Port and then enter receive-loop.
%% Returns : error | never returns
%%--------------------------------------------------------------------
init(Host, Port, LogFun, Parent) ->
case gen_tcp:connect(Host, Port, [binary, {packet, 0}]) of
{ok, Sock} ->
Parent ! {mysql_recv, self(), init, {ok, Sock}},
State = #state{socket = Sock,
parent = Parent,
log_fun = LogFun,
data = <<>>
},
loop(State);
E ->
mysql:log(LogFun, error,
"mysql_recv: Failed connecting to ~p:~p : ~p",
[Host, Port, E]),
Msg = lists:flatten(io_lib:format("connect failed : ~p", [E])),
Parent ! {mysql_recv, self(), init, {error, Msg}}
end.
%%--------------------------------------------------------------------
%% Function: loop(State)
%% State = state record()
%% Descrip.: The main loop. Wait for data from our TCP socket and act
%% on received data or signals that our socket was closed.
%% Returns : error | never returns
%%--------------------------------------------------------------------
loop(State) ->
Sock = State#state.socket,
receive
{tcp, Sock, InData} ->
NewData = list_to_binary([State#state.data, InData]),
%% send data to parent if we have enough data
Rest = sendpacket(State#state.parent, NewData),
loop(State#state{data = Rest});
{tcp_error, Sock, Reason} ->
mysql:log(State#state.log_fun, error, "mysql_recv: "
"Socket ~p closed : ~p", [Sock, Reason]),
State#state.parent ! {mysql_recv, self(), closed,
{error, Reason}},
error;
{tcp_closed, Sock} ->
mysql:log(State#state.log_fun, debug, "mysql_recv: "
"Socket ~p closed", [Sock]),
State#state.parent ! {mysql_recv, self(), closed, normal},
error
end.
%%--------------------------------------------------------------------
%% Function: sendpacket(Parent, Data)
%% Parent = pid()
%% Data = binary()
%% Descrip.: Check if we have received one or more complete frames by
%% now, and if so - send them to Parent.
%% Returns : Rest = binary()
%%--------------------------------------------------------------------
%% send data to parent if we have enough data
sendpacket(Parent, Data) ->
case Data of
<<Length:24/little, Num:8, D/binary>> ->
if
Length =< size(D) ->
{Packet, Rest} = split_binary(D, Length),
Parent ! {mysql_recv, self(), data, Packet, Num},
sendpacket(Parent, Rest);
true ->
Data
end;
_ ->
Data
end.