gen_java: easy java for erlang
Introduction
- I write Erlang at CHEF
- I used to write Java
Analytics at Chef
Stream processing of all data that flows through
Chef Server.
Used on clusters of up to 40,000 nodes!
Sends alerts to various endpoints
when various things happen
- Hipchat
- SMTP
- Webhooks
- More Coming Soon!
Want more?
BUY MY CONFIGURATION MANAGEMENT INFRASTRUCTURE
CHEF.io | Analytics Documentation
COME TO CHEFCONF: 3/31 - 4/2 Santa Clara
Alaska Pipeline
Apache Storm Pipeline
It's a pipeline, Alaska has pipelines,
so we called it Alaska
You write rules Rule Documentation
rules 'PCI 2.3 – Confirm telnet port not available' rule on run_control when name = 'telnet not listening' and resource_type = 'port' and resource_name = '23' and status != 'success' then audit:error("PCI 2.3 - Encrypt all non-console \ administrative access such as \ browser/Web-based management tools.") notify("security-team@financialcorp.com", "{{run.node_name}} is listening for \ connections on port 23/telnet!") end end
notify(X) will use a different set of definitions for what those messages contain.
Parsing Rules
Alaska Rules, an ANTLR grammar for Java
Events processed by Apache Storm pipeline
Rule syntax based on a subset of Complex Event Processing (CEP)
More info on that:
Configuration Web Service
- Erlang
- Webmachine REST Framework
- Sqerl Lightweight ORM on top of epgsql
Validating Rules
Dave likes writing parsers, so he gave us
Erlaska Rules
Neotoma Parser
Neotoma is a packrat parser-generator for Erlang for Parsing Expression Grammars (PEGs).
The important thing being that it's different from how ANTLR does grammars
erlaska_rules only ever validated syntax, whereas alaska_rules is an actual compiler that generates code to evaluate in the pipeline
erlaska_rules.erl
erlaska_rules is a module generated by the neotoma project. Once we have that parser, validating rules from webmachine was as easy as:
%% inside malformed_request/2 case erlaska_rules:parse(Rule) of true -> {false, Req, State#state{rule=Rule}}; {false, _Reason} -> {true, Req, State} end;
Problem?
This worked fine at first, but every change to the grammar had to be duplicated. Well, it turns out that we never got that far. We never actually achieved 100% compatibility.
What If?
We could call the Java parser from Erlang?
We've already got the ANTLR grammar, which is the definitive truth for correctness of rules anyway. If we could use that, we cut our work in half.
Even though Dave loves parsers.
The easy way
We could have just made a java command line tool for parsing rules, but it just seemed like too much of a hack
Wait
I've run Java from Erlang before with Riak_JMX. If you have to do something twice, it's time to make it generic.
Let's Do More
But actually, I'm doing something new here. What I really want to do is send Java an rpc:call and have Erlang not really even care that Java is involved.
JInterface
It turns out we've had this for a while.
It understands the ideas of:
- Nodes
- EPMD
- Erlang Datatypes
- Process Messages
No RPC, No Problem
Note: OTP source links will all be to the tag R16B03-1
I already knew that RPC calls were handled by a process called `rex`, so I started digging around the Erlang source for it
%% In the source for rpc.erl -define(NAME, rex). do_call(Node, Request, Timeout) -> %% ... Result = gen_server:call({?NAME,Node}, Request, Timeout),
So, what's `Request` look like?
It's coming in to rpc:do_call, so let's look at rpc:call which calls it.
call(N,M,F,A,infinity) when node() ==:== N -> %% Optimize local call local_call(M,F,A); call(N,M,F,A,infinity) -> do_call(N, {call,M,F,A,group_leader()}, infinity); call(N,M,F,A,Timeout) when is_integer(Timeout), Timeout >= 0 -> do_call(N, {call,M,F,A,group_leader()}, Timeout).
do_call
- Some RPC magic we don't need to worry about
- what we do care about is that it calls gen_server:call
There's some pretty nifty stuff in there about spawning monitors and trapping exits, but it's not really relevant to what we're doing here
Request
Request = { call :: atom(), Module :: atom(), Function :: atom(), Arguments :: [any()], GroupLeader :: pid() }
But wait, there's more
That's not all Erlang would be sending to another node. Let's dig into the gen_server:call
call(Name, Request, Timeout) -> case catch gen:call(Name, '$gen_call', Request, Timeout) of {ok,Res} -> Res; {'EXIT',Reason} -> exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end.
the rabbit hole goes deeper.
WARNING: rpc is in kernel, but gen_server is in stdlib if you're digging in source
gen:call
Source: gen:call
%% deep in gen:do_call, which is called by gen:call erlang:send(Process, {Label, {self(), Mref}, Request}, %% <- THIS! [noconnect])
Jackpot! The second argument to erlang:send/3 is our message! The actual message being sent is a 3-tuple
So, here's the path
rpc:call( Node, M,F,A, T) -> rpc:do_call( Node, {call,M,F,A,GL},T) -> gen_server:call({rex,Node}, {call,M,F,A,GL},T) -> gen:call( {rex,Node}, '$gen_call',{call,M,F,A,GL},T) -> gen:do_call( {rex,Node}, '$gen_call',{call,M,F,A,GL},T) -> erlang:send( {rex,Node},{'$gen_call', {self(), Mref}, {call,M,F,A,GL}). %%% ^^^ JACKPOT!
1st element: ID
'$gen_call'
2nd element: Return Address
{ From :: pid(), MRef :: ref() }
From pid could be waiting for a bunch of replies. MRef let's it know what it's a reply to
3rd element: RPC Request
Request from above
Request = { call :: atom(), Module :: atom(), Function :: atom(), Arguments :: [any()], GroupLeader :: pid() }
Now we know
what Erlang sends to other erlang nodes for rpc:call
Knowing is half the battle!
Setting up the Java Side
JInterface gives us Node for free, so we can just set something up to listen for messages
public static void main(String[] stringArgs) throws Exception { String nodename = stringArgs[0]; String cookie = stringArgs[1]; OtpNode self = new OtpNode(nodename, cookie); OtpMbox rex = self.createMbox("rex"); while(true) { // rex.receive is a blocking call, // so just hang out here until one shows up OtpErlangObject o = rex.receive(); System.out.println("Rex received '" + o.toString()); } }
The Simplest of Java nodes. Just opens up a `rex` mailbox and waits for messages. Any rpc:call to this node will just print it's content to stdout.
Deserialization in Java
This is where we start missing pattern matching. It takes about 50 lines of Java to parse out that 3-tuple that gen:do_call is sending over. And that's with Exception handling abstracted out
Validate Arity
OtpErlangTuple rexCall = (OtpErlangTuple)o; int arity = rexCall.arity(); if (arity != 3) { throw new Exception( "Rex message has invalid arity. expected 3, got " + arity); }
Validate gen_call as first element:
Remember the 1st element? '$gen_call'
OtpErlangAtom gen_call = (OtpErlangAtom)(rexCall.elementAt(0)); String gen_call_string = gen_call.atomValue(); if (!gen_call_string.equals("$gen_call")) { throw new Exception( "Rex message should start with '$gen_call': " + o.toString()); }
Validate second element: {Pid::pid, Ref::ref}
OtpErlangTuple fromTuple = (OtpErlangTuple)(rexCall.elementAt(1)); int fromArity = fromTuple.arity(); if (fromArity != 2) { throw new Exception( "Rex message's 'from' tuple should have 2 elements, has " + fromArity + ": " + o.toString()); } this.fromPid = (OtpErlangPid)(fromTuple.elementAt(0)); this.fromRef = (OtpErlangRef)(fromTuple.elementAt(1));
Validate the call tuple:
{call::atom, Mod::atom, Fun::atom, List::list(), user:atom()}
OtpErlangTuple callTuple = (OtpErlangTuple)(rexCall.elementAt(2)); int callArity = callTuple.arity(); if (callArity != 5) { throw new ErlangRemoteException(this.fromPid, this.fromRef, "Rex message's 'call' tuple should have 5 elements, has " + callArity + ": " + o.toString()); } OtpErlangAtom callAtom = (OtpErlangAtom)(callTuple.elementAt(0)); String callString = callAtom.atomValue(); if (!callString.equals("call")) { throw new ErlangRemoteException(this.fromPid, this.fromRef, "Rex message's call block should start with 'call', but it's : " + callString); }
Validate M,F,A
try { this.mfa = new ErlangModFunArgs( (OtpErlangAtom)(callTuple.elementAt(1)), (OtpErlangAtom)(callTuple.elementAt(2)), (OtpErlangList)(callTuple.elementAt(3))); this.remoteGroupLeaderPid = (OtpErlangPid)(callTuple.elementAt(4)); } catch (Exception e) { throw new ErlangRemoteException(this.fromPid, this.fromRef, e); }
Exception Handling: toErlangException
Source: ErlangRemoteException.java
turns exceptions into {error, "Message"}
public static OtpErlangObject toErlangException(Exception e) { OtpErlangObject[] elements = new OtpErlangObject[2]; elements[0] = new OtpErlangAtom("error"); elements[1] = new OtpErlangString(e.getMessage()); return new OtpErlangTuple(elements); }
Exception Handling: send
send knows just enough about erlang/rex to send an error message back to rpc:call
We forgot to look at that! Fortunately it's here in gen:do_call
It's waiting for a
{ref(), Reply}
So we send
public void send(OtpMbox mbox) { OtpErlangObject[] elements = new OtpErlangObject[2]; elements[0] = this.fromRef; elements[1] = this.toErlangException(); mbox.send(this.fromPid, new OtpErlangTuple(elements)); }
But, sometimes not.
If you noticed, we don't start using ErlangRemoteException until after we've read in the second tuple. It's not until then that we know enough about the sender to know where to send the reply. Before that, we just throw regular exceptions. We'll catch both types when we process incoming messages. If we don't know how to respond, we'll just dump the output to the console, which we'll teach the erlang side to monitor.
try/catch
Java incoming message processing
ErlangRemoteProcedureCallMessage msg = null; try { msg = new ErlangRemoteProcedureCallMessage(rex, o); } catch (ErlangRemoteException erlE) { erlE.send(rex); } catch (Exception e) { System.out.println("Rex received '" + o.toString() + "' but didn't know how to process it. Exception: " + e.getMessage()); }
Back to the Erlang side
The gen_java module
- It's a gen_server
- Starts a jar of your choosing!
- When you build that jar, include gen_java.jar
The gen_java project structure
- src/main/java <- maven will build a jar with this
- src/main/erlang <- rebar will use this
At least it's not McRib
Starting the gen_java server
- Opens a port running your jar in the JVM
Basic Handshake
Fetch = fun() -> X = rpc:call(Nodename, erlang, node, [], 10000), Nodename = : = X end, case wait_until(Fetch, 20, 1000) of ok -> rpc:call(Nodename, erlang, link, [self()]), erlang:monitor_node(Nodename, true), init_callback( State#gen_java_state{ port = Port, pid = Pid}); timeout -> {stop, timeout} end
Figure 2: that's so fetch
Handshake: What just happened?
- keeps rpc calling erlang:node/0 until it gets an answer
- if it doesn't stop the server, otherwise
- link the java node back to the server's process
- monitor the java node
- init_callback?
After we've started, there's a callback that lets you run some start up java code before we start accepting rpc:calls
Error logging
handle_info({Port, {data, {_Type, Data}}}, #gen_java_state {port = Port, module = M } = State) -> lager:info("[gen_java][~p] ~s", [M, Data]), {noreply, State};
Now that we've got a port running this JVM anything that java System.out.printlns will end up in your erlang application's log
Recap
We're sending rpc:calls to the java node
we can send error messages back
- console
- rpc responses
So, what do we do with actual rpc calls?
The Easy Way : Hard Coded
There are somethings we just want every java node to be able to do:
Needed by our Handshake
- erlang:node/0
- erlang:link/1
POC Methods
- erlang:abs/1 x2
Nice for JVM inspection
- java:system_properties/0
- java:system_env/0
- java:input_args/0
WTF is the java module?!
I made it up. I made the erlang module up too. Java doesn't have these
Let's talk about how we map erlang MFAs
All Others
- must be java methods of type public static final
- must have all arguments and return types of classes provided by JInterface
- since java reflection is a bit expensive, we cache the Method objects.
Initializing the RPC Method Cache
Map<ErlangFunctionCacheKey, Method> RPCCache = new HashMap<ErlangFunctionCacheKey, Method>(); RPCCache.put( new ErlangFunctionCacheKey( "erlang", "abs", OtpErlangDouble.class), Erlang.class.getMethod("abs", OtpErlangDouble.class)); RPCCache.put( new ErlangFunctionCacheKey( "erlang", "abs", OtpErlangLong.class), Erlang.class.getMethod("abs", OtpErlangLong.class));
last arg is variable list of classes
dat java module
// wrapper for java.util.System.getProperties() RPCCache.put( new ErlangFunctionCacheKey("java", "system_properties"), Java.class.getMethod("system_properties")); RPCCache.put( new ErlangFunctionCacheKey("java", "system_env"), Java.class.getMethod("system_env")); RPCCache.put( new ErlangFunctionCacheKey("java", "input_arguments"), Java.class.getMethod("input_arguments"));
ACHEIVEMENT UNLOCKED: Java dot java
public static OtpErlangList system_properties() { List<OtpErlangTuple> l = new ArrayList<OtpErlangTuple>(); Iterator<Map.Entry<Object, Object>> it = System.getProperties().entrySet().iterator(); while(it.hasNext()) { Map.Entry<Object, Object> i = it.next(); OtpErlangObject[] elems = new OtpErlangObject[2]; elems[0] = new OtpErlangAtom(i.getKey().toString()); elems[1] = new OtpErlangBinary( i.getValue().toString().getBytes()); OtpErlangTuple t = new OtpErlangTuple(elems); l.add(t); } return new OtpErlangList(l.toArray(new OtpErlangObject[0])); }
java:system_properties()
(erlang@127.0.0.1)1> net_adm:ping('java@127.0.0.1'). pong (erlang@127.0.0.1)2> rpc:call('java@127.0.0.1', java, system_properties, []). [{'java.runtime.name',<<"Java(TM) SE Runtime Environment">>}, {'sun.boot.library.path',<<"/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre/lib">>}, {'java.vm.version',<<"24.71-b01">>}, {gopherProxySet,<<"false">>}, {'java.vm.vendor',<<"Oracle Corporation">>}, {'java.vendor.url',<<"http://java.oracle.com/">>}, {'path.separator',<<":">>}, {'java.vm.name',<<"Java HotSpot(TM) 64-Bit Server VM">>}, {'file.encoding.pkg',<<"sun.io">>}, {'user.country',<<"US">>}, {'sun.java.launcher',<<"SUN_STANDARD">>}, {'sun.os.patch.level',<<"unknown">>}, {'java.vm.specification.name',<<"Java Virtual Machine Specification">>}, {'java.runtime.version',<<"1.7.0_71-b14">>}, {'java.awt.graphicsenv',<<"sun.awt.CGraphicsEnvironment">>}, {'java.endorsed.dirs',<<"/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre/lib/endorsed">>}, {'os.arch',<<"x86_64">>}, {'java.io.tmpdir',<<"/var/folders/hl/zf_j1bvs7_b18rj7bbsm35p00000gp/T/">>}, {'line.separator',<<"\n">>}, {'java.vm.specification.vendor',<<"Oracle Corporation">>}, {'os.name',<<"Mac OS X">>}, {'sun.jnu.encoding',<<"UTF-8">>}, {'java.library.path',<<"/System/Library/Java/Extensions:/usr/lib/java:.">>}, {'java.specification.name',<<"Java Platform API Specification">>}, {'java.class.version',<<"51.0">>}, {'sun.management.compiler',<<"HotSpot 64-Bit Tiered Compilers">>}, {'os.version',<<"10.10.2">>}, {'http.nonProxyHosts',<<"local|*.local|169.254/16|*.169.254/16">>}, {'user.timezone',<<>>}, {'java.awt.printerjob',<<"sun.lwawt.macosx.CPrinterJob">>}, {'file.encoding',<<"UTF-8">>}, {'java.specification.version',<<"1.7">>}, {'java.class.path',<<"target/gen_java-0.1.2-SNAPSHOT-jar-with-dependencies.jar">>}, {'java.vm.specification.version',<<"1.7">>}, {'sun.java.command',<<"com.devivo.gen_java.ErlangServer java@127.0.0.1 cookie 10">>}, {'java.home',<<"/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre">>}, {'sun.arch.data.model',<<"64">>}, {'user.language',<<"en">>}, {'java.specification.vendor',<<"Oracle Corporation">>}, {'awt.toolkit',<<"sun.lwawt.macosx.LWCToolkit">>}, {'java.vm.info',<<"mixed mode">>}, {'java.version',<<"1.7.0_71">>}, {'java.vendor',<<"Oracle Corporation">>}, {'file.separator',<<"/">>}, {'java.vendor.url.bug',<<"http://bugreport.sun.com/bugreport/">>}, {'sun.io.unicode.encoding',<<"UnicodeBig">>}, {'sun.cpu.endian',<<"little">>}, {socksNonProxyHosts,<<"local|*.local|169.254/16|*.169.254/16">>}, {'ftp.nonProxyHosts',<<"local|*.local|169.254/16|*.169.254/16">>}, {'sun.cpu.isalist',<<>>}]
What about your own methods?
Module: Full Java Class Name
Function: Java Method Name
Args: ARGS!
Caching?
if(RPCCache.containsKey(msg.getMFA().getKey())) { Method m = RPCCache.get(msg.getMFA().getKey()); msg.setMethod(m); pool.execute(msg); } else { //// This means it's not in the cache, we should //// try and find it and add it. Method m = find(msg.getMFA().getKey()); if (m != null) { RPCCache.put(msg.getMFA().getKey(), m); msg.setMethod(m); pool.execute(msg); } else { System.out.println("Bad RPC: " + msg.getMFA().getKey().toString()); //// we couldn't add it, be nice and send a badrpc error back msg.send(msg.toErlangBadRPC()); } }
msg.toErlangBadRPC()
% Bad RPC calls look like this: {badrpc,{'EXIT',{undef,[{Module,Fun,[],[]}, {rpc,'-handle_call_call/6-fun-0-',5, [{file,"rpc.erl"},{line,205}]}]}}}
So we construct that tuple as a repsonse and send it
Caching Payoff!
Reflection is only done once per method.
We're aiming for the pool, right?
pool.execute(msg);
We went ahead and added some thread pooling on the java side.
Otherwise all the processing happening in once place. what if you asked it to do hard things?
pool.execute() is where we package up the method's return value and send it back to Erlang.
public void run() { OtpErlangObject result = new OtpErlangAtom("null"); try { result = (OtpErlangObject) this.method.invoke(null, getMFA().getArgs().elements()); } catch (Exception e) { //// This could "technically" throw a InvocationTargetException //// or an IllegalAccessException. We'll write defensive code //// for that eventually System.out.println(e.getClass().getName() + " : " + e.getMessage()); result = error(e.getClass().getName() + " : " + e.getMessage()); } this.send(result); }
Wrapping Responses
public void send(OtpErlangObject resp) { this.rex.send(this.fromPid, wrapResponse(resp)); } public OtpErlangTuple wrapResponse(OtpErlangObject resp) { OtpErlangObject[] elements = new OtpErlangObject[2]; elements[0] = this.fromRef; elements[1] = resp; return new OtpErlangTuple(elements); }
this.send makes sure to send it to the right place
wrapResponse makes sure to include that ref() we need for RPC
Erlang Developer Experience
You might remember that I'm kind of a user experience nut
Your Java Module
-module(my_java). -compile({parse_transform, gen_java_parse_transform}).
Your sys.config
[{gen_java, [
{modules, [
{my_java, [
{jar, "/path/to/my.jar"},
{thread_count, 10}
]}
]}
]}
].
Your Supervisor
start it with my_java:start_link/0 or
{my_java, {my_java, start_link, []}, permanent, 5000, worker, [my_java]},
start_link/0?! Parse Transform
wrappers for gen_java functions
17 = my_java:call(erlang, abs, [-17]). <<"your heart's desire">> = my_java:call('com.my.package','myMethod',[]).
5 Functions for FREE
This whole file just looks for a module's name, and subs it in to 5 functions
-export([start_link/0,start/0,call/3,call/4,stop/0]). stop() -> gen_java:stop(my_java). call(Module, Function, Args, Timeout) -> gen_java:call(my_java, Module, Function, Args, Timeout). call(Module, Function, Args) -> gen_java:call(my_java, Module, Function, Args). start() -> gen_java:start(my_java). start_link() -> gen_java:start_link(my_java).
init callback
Remember that? put it here, it'll get called right after the handshake
-spec init(atom()) -> ok. init(Nodename) -> SomeState = {some, thing, maybe_a_file_path}, rpc:call(Nodename, 'com.yourcompany.package', 'init', [SomeState]).
Adding convenience
-spec my_method(binary()) -> binary() | gen_java:badrpc(). my_method(Binary) -> call('com.my.package','myMethod',[Binary]).
Then using java in your app is as easy as
my_java:my_method(Binary).
Bringing it Back to CHEF Analytics
erlaska_rules is out!
alaska_rules.jar is in!
sys.config
[{gen_java, [
{modules, [
{alaska_rules, [
{jar, "priv/alaska_rules.jar"},
{thread_count, 10}
]}
]}
]}
].
alaska_rules.erl
-module(alaska_rules). -compile({parse_transform, gen_java_parse_transform}). -export([valid_rule/1, valid_rule_group/1, init/1]). -spec valid_rule(binary()) -> true | {error, string()} | gen_java:badrpc(). valid_rule(Bin) -> call('com.chef.analytics.rules.erlang.RuleValidator', 'validRule', [Bin]). -spec valid_rule_group(binary()) -> true | {error, string()} | gen_java:badrpc(). valid_rule_group(Bin) -> call('com.chef.analytics.rules.erlang.RuleValidator', 'validRuleGroup', [Bin]).
What do those java methods look like?
public static OtpErlangObject validRule(OtpErlangBinary ruleBin) { try { String ruleText = new String(ruleBin.binaryValue()); Rule r = compiler.compile(ruleText); return new OtpErlangAtom(true); } catch (Exception e) { return ErlangRemoteException.toErlangException(e); } } public static OtpErlangObject validRuleGroup(OtpErlangBinary ruleGrpBin) { try { String ruleGrpText = new String(ruleGrpBin.binaryValue()); RuleGroup rg = compiler.compileGroup(ruleGrpText); return new OtpErlangAtom(true); } catch (Exception e) { return ErlangRemoteException.toErlangException(e); } }
init/1
We have some JSON schemas that alaksa_rules.jar uses for validation of attributes.
init/1 reads them in as a list of binaries and then sends them over to the java node
init(Nodename) -> Dir = schema_dir(), JSONSchemas = filelib:wildcard(filename:join([Dir, "*.json"])), Schemas = [begin {ok, Bin} = file:read_file(Filename), {list_to_atom(filename:basename(Filename)), Bin} end || Filename <- JSONSchemas], rpc:call(Nodename, 'com.chef.analytics.rules.erlang.RuleValidator', 'setSchemas', [Schemas]), ok.
Bringing it back to WebMachine
case alaska_rules:valid_rule_group( nc_obj_rule:getval(rule, Rule)) of true -> lager:debug("malformed_request: rule syntax good"), {false, Req, State}; {error, Msg} -> lager:debug("Invalid rule syntax: ~s", [Msg]), mf_return(Msg, [], Req, State); {badrpc, nodedown} -> lager:error("Alaska Rules node down, no validation possible"), NewReq = req_helper([ {set_resp_header, ["content-type", "application/json"]}, {set_resp_body, [jiffy:encode( {[{error, <<"server side validation error">>}]})]} ], Req), {{halt, 500}, NewReq, State} end.
Wrapping Up
All in all, this is just a wrapper for the hard stuff Erlang gave us for free. But what if they didn't?
Erlang Haskell Interface
Introducing Erlang Haskell Interface 0.2 github source
Erlang gives you zero Haskell for free
But somebody did: hackage erlang-0.1
What I got:
Erlang Types in Haskell
data ErlType = ErlNull | ErlInt Int | ErlBigInt Integer | ErlString String | ErlAtom String | ErlBinary [Word8] | ErlList [ErlType] | ErlTuple [ErlType] | ErlPid ErlType Int Int Int -- node id serial creation | ErlPort ErlType Int Int -- node id creation | ErlRef ErlType Int Int -- node id creation | ErlNewRef ErlType Int [Word8] -- node creation id deriving (Eq, Show)
Packing functions
putC = putWord8 . fromIntegral putn = putWord16be . fromIntegral putN = putWord32be . fromIntegral puta = putByteString . B.pack putA = putByteString . C.pack getC = liftM fromIntegral getWord8 getn = liftM fromIntegral getWord16be getN = liftM fromIntegral getWord32be geta = liftM B.unpack . getByteString getA = liftM C.unpack . getByteString
Half a Protocol
Looks like erlang-0.1 knew how to connect to an Erlang node from Haskell
It wanted it one way, but I wanted the other
Getting the old one working
nano-md5 dependency didn't work anymore, so replaced with PureMD5
Existing Documentation wasn't great, but it might have been me
Spinning up an Erlang node in Haskell
start nodename = do setupLoggers DEBUG infoM "Test" $ "Starting Node: " ++ nodename self <- createSelf nodename mbox <- createMBox self debugM "Test" $ "mbox: " ++ (show mbox) forever $ do rex_mbox <- createNamedMBox "rex" self forkIO $ rex nodename rex_mbox return ()
createSelf: Creating the Haskell Node
createSelf :: String -> IO Self createSelf nodename = do inbox <- newEmptyMVar forkIO $ serve nodename inbox forkIO $ self nodename inbox node <- return . Self $ putMVar inbox nk_mbox <- createNamedMBox "net_kernel" node forkIO $ net_kernel nk_mbox return node
self is the thing that routes those messages
serve
serve is the function that connects to epmd, opens up a listener and then puts messages in a mbox
Learning EPMD
Reserving a port
EMPD_ALIVE2_REQ
Open a socked with this request and keep it open… forever.
Here's the message EPMD expects
Bytes | Content |
---|---|
1 | 120 |
2 | Port to reserve |
1 | 77 (means normal erlang node) |
1 | Protocol (0 = tcp/ipv4) |
2 | Highest version (5 = R6B and higher) |
2 | Lowest version (5 = R6B and higher) |
2 | Length in bytes of nodename field |
X | Nodename, X = ^^ |
2 | Length of Extras, we used 0 |
Y | Extras, length ^^, but we sent none |
What's that look like?
Figure 4: Wiretap of ALIVE2_REQ
Figure 5: Bytes of ALIVE2_RESP
Haskell sends a EPMD_ALIVE2_REQ
epmdAlive2Req :: String -> Int -> IO () epmdAlive2Req node port = withEpmd $ \hdl -> do let msg = runPut $ tag 'x' >> putn port >> putC 77 >> -- node type putC 0 >> -- protocol putn erlangVersion >> putn erlangVersion >> putn (length node) >> putA node >> putn 0 -- "Extra" length, 0 for none let len = fromIntegral $ B.length msg let out = runPut $ putn len >> putLazyByteString msg forever $ do B.hPut hdl out hFlush hdl B.hGetContents hdl return ()
See that forever call. just hang out letting EPMD know you still love it.
TIL: You can run `empd -debug` to see what's coming across the wire through EPMD
The Distribution Handshake
ALIVE2_REQ isn't even a quarter of the handshake.
We also have to do a back and forth over the port we're actually listening on
send_name ------> recv_name recv_status <------ send_status send_status ------> recv_status recv_challenge <------ send_challenge send_challenge_reply ------> recv_challenge_reply recv_challege_ack <------ send_challenge_ack
Figure 6: Here's an example of SEND_NAME
Let's gloss over this. If you want to see it, I did it here: Network.hs
the serve function: listening for erlang communication
Opens a socket on port X
Does the ALIVE2_REQ with port X to EPMD
Does the Distributed Erlang Handshake with the ErlNode
Opens up two way communication erl <-> hs
Routes any received messages to self via ErlDispatch
serve :: String -> MVar ErlMessage -> IO () serve nodename outbox = S.withSocketsDo $ do sock <- S.socket (S.addrFamily serveraddr) S.Stream S.defaultProtocol S.bindSocket sock (S.addrAddress serveraddr) port <- S.socketPort sock forkIO $ epmdAlive2Req nodename $ read $ show port S.listen sock 5 -- Create a lock to use for synchronizing access to the handler lock <- newMVar () -- Loop forever waiting for connections. Ctrl-C to abort. procRequests lock sock
procRequests: processing incoming socket connections from Erlang
procRequests :: MVar () -> S.Socket -> IO () procRequests lock mastersock = do (connsock, clientaddr) <- S.accept mastersock handleLog lock clientaddr $ B.pack "Foreign.Erlang.Server: client connnected" forkIO $ procMessages lock connsock clientaddr procRequests lock mastersock
procMessages: processing messages from that socket
procMessages :: MVar () -> S.Socket -> S.SockAddr -> IO () procMessages lock connsock clientaddr = do connhdl <- S.socketToHandle connsock ReadWriteMode hSetBuffering connhdl NoBuffering (to, send, recv) <- erlConnectS connhdl nodename mvar <- newEmptyMVar forkIO $ nodeSend mvar send forkIO $ nodeRecv mvar recv outbox let node = putMVar mvar putMVar outbox $ ConnectedNode to node
nodeRecv: routing incoming messages
{- A `nodeRecv` thread is responsible for communication from an Erlang process. It receives messages from the network and dispatches them as appropriate. -} nodeRecv mvar recv outbox = loop where loop = do (mctl, mmsg) <- recv case mctl of -- Nothing is a keepalive. All we want to do is echo it. Nothing -> putMVar mvar (Nothing, Nothing) -- A real message goes to self to be dispatched. Just ctl -> putMVar outbox $ ErlDispatch ctl (fromJust mmsg) loop
Funky Middle Syntax
Protocol between connected nodes
Turns out we need to figure out how to interpret Erlangy packets coming in now
Here's the distilled version of what they could be:
{1, FromPid, ToPid} %% LINK {2, Cookie, ToPid} %% SEND {3, FromPid, ToPid, Reason} %% EXIT {4, FromPid, ToPid} %% UNLINK {5} %% NODE_LINK {6, FromPid, Cookie, ToName} %% REG_SEND {7, FromPid, ToPid} %% GROUP_LEADER {8, FromPid, ToPid, Reason} %% EXIT2 {12, Cookie, ToPid, TraceToken} %% SEND_TT {16, FromPid, Cookie, ToName, TraceToken} %% REG_SEND_TT {18, FromPid, ToPid, TraceToken, Reason} %% EXIT2_TT {19, FromPid, ToProc, Ref} %% MONITOR_P {20, FromPid, ToProc, Ref} %% DEMONITOR_P {21, FromProc, ToPid, Ref, Reason} %% MONITOR_P_EXIT
Here's how the self process is handling them
Full Function: Processes.hs
I left a bunch of clauses off this slide
self :: String -> MVar ErlMessage -> IO () self nodename inbox = loop 1 [] [] [] where loop id registered mboxes nodes = do msg <- takeMVar inbox debugM "Foreign.Erlang.Processes" $ "loop msg recv'd: " ++ (show msg) case msg of -- other cases omited for clarity ErlDispatch ctl msg -> do case ctl of ErlTuple [ErlInt 2, _, pid] -> maybe (return ()) ($ msg) $ lookup pid mboxes ErlTuple [ErlInt 6, from, _, pid] -> maybe (return ()) ($ (ErlTuple [from, msg])) $ lookup pid registered _ -> return () loop id registered mboxes nodes -- This clause is for when Erlang has connected to this node -- we're just telling this node to add it to the connected nodes. ConnectedNode to node -> do case lookup to nodes of Just n -> loop id registered mboxes nodes Nothing -> loop id registered mboxes ((to, node):nodes) ErlStop -> return ()
net_kernel
-- This is the loop that receives erlang messages to the net_kernel -- module. Without it, you can't ping this node net_kernel mbox = do (ErlTuple [ from@(ErlPid (ErlAtom node) a b c), msg@(ErlTuple [_,ErlTuple [_,ref],_]) ]) <- mboxRecv mbox mboxSend mbox node (Left from) $ ErlTuple [ref, ErlAtom "yes"] net_kernel mbox
An rpc:call received by Haskell
erlang: rpc:call('haskell@127.0.0.1', 'mod', 'fun', ['args']).
ErlPid (ErlAtom "erlang@127.0.0.1") 38 0 2 ErlTuple [ErlAtom "$gen_call", ErlTuple [ErlPid (ErlAtom "erlang@127.0.0.1") 38 0 2, ErlNewRef (ErlAtom "erlang@127.0.0.1") 2 [0,0,0,191,0,0,0,0,0,0,0,0]], ErlTuple [ErlAtom "call", ErlAtom "mod", ErlAtom "fun", ErlList [ErlAtom "args"], ErlPid (ErlAtom "erlang@127.0.0.1") 31 0 2]]
This should look familiar!
The Rex mbox handler
rex nodename mbox = do (ErlTuple [ from@(ErlPid (ErlAtom node) a b c), msg@(ErlTuple [_,ErlTuple [_,ref],ErlTuple [ call, ErlAtom modName, ErlAtom funName, args, _ -- GroupLeader ]) ]) <- mboxRecv mbox debugM "Test" $ "rpc " ++ modName ++ ":" ++ funName ++ "(" ++ (show args) ++ ")" case (modName, funName, args) of ("erlang", "node", ErlNull) -> mboxSend mbox node (Left from) $ ErlTuple [ref, ErlAtom (nodename ++ "@127.0.0.1") ] otherwise -> mboxSend mbox node (Left from) $ ErlTuple [ref, ErlAtom "haskell_equals_very_yes"] rex nodename mbox
Future Work
Notice I'm just returning 'haskell_equals_very_yes' for everything. I'm just excited that's working since it's my first stab at Haskell. Plenty of future work here.
gen_haskell?
- Use port commands to start GHC instead of java!
- Catch the output
- Mostly cut and paste from gen_java
It works!
➜ rpc_test ./_rel/rpc_test/bin/rpc_test console Exec: /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/erts-5.10.3/bin/erlexec -boot /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/rpc_test -env ERL_LIBS /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/lib -config /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/sys.config -args_file /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/vm.args -- console Root: /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false] 15:33:17.970 [info] Application lager started on node 'rpc_test@127.0.0.1' 15:33:17.970 [info] Application gen_java started on node 'rpc_test@127.0.0.1' 15:33:17.970 [info] Application gen_haskell started on node 'rpc_test@127.0.0.1' 15:33:17.973 [info] [gen_java][my_java] starting (pid: <0.89.0>) 15:33:17.975 [info] [gen_java][my_java] cmd: "java -server -cp priv/gen_java.jar com.devivo.gen_java.ErlangServer gen_java_my_java_rpc_test@127.0.0.1 RVDTHTVOBAMGCPHVSWZW 10" 15:33:17.981 [info] [gen_java][my_java] startup: "7399" 15:33:18.066 [info] [gen_java][my_java] startup: "Starting OTP Node 'gen_java_my_java_rpc_test@127.0.0.1' with cookie RVDTHTVOBAMGCPHVSWZW" 15:33:18.099 [info] [gen_java][my_java] startup: "Started node: gen_java_my_java_rpc_test@127.0.0.1" 15:33:18.099 [info] [gen_java][my_java] OS Pid: "7399" 15:33:18.119 [info] [gen_java][my_java] Thread Pool Size : 10 15:33:18.123 [info] [gen_hasekll][my_haskell] starting (pid: <0.95.0>) 15:33:18.123 [info] [gen_haskell][my_haskell] cmd: "runghc Node gen_haskell_my_haskell_rpc_test" 15:33:18.128 [info] [gen_haskell][my_haskell] startup: "7402" 15:33:18.600 [info] [gen_haskell][my_haskell] startup: "Starting Node: gen_haskell_my_haskell_rpc_test" 15:33:18.600 [info] [gen_haskell][my_haskell] startup: "mbox: MBox ErlPid (ErlAtom \"gen_haskell_my_haskell_rpc_test\") 1 0 1 MVar *self*" 15:33:18.600 [info] [gen_haskell][my_haskell] OS Pid: "7402" 15:33:18.601 [info] [gen_haskell][my_haskell] rpc:call('gen_haskell_my_haskell_rpc_test@127.0.0.1', erlang, node, []) = {badrpc,nodedown} 15:33:19.606 [info] [gen_haskell][my_haskell] rpc:call('gen_haskell_my_haskell_rpc_test@127.0.0.1', erlang, node, []) = 'gen_haskell_my_haskell_rpc_test@127.0.0.1' 15:33:19.607 [info] [gen_haskell][my_haskell] rpc erlang:node(ErlNull) 15:33:19.607 [info] [gen_haskell][my_haskell] rpc erlang:link(ErlList [ErlPid (ErlAtom "rpc_test@127.0.0.1") 95 0 3]) 15:33:19.607 [info] Application rpc_test started on node 'rpc_test@127.0.0.1' Eshell V5.10.3 (abort with ^G) (rpc_test@127.0.0.1)1> my_java:call(erlang, node, []). 'gen_java_my_java_rpc_test@127.0.0.1' (rpc_test@127.0.0.1)2> my_haskell:call(erlang, node, []). 'gen_haskell_my_haskell_rpc_test@127.0.0.1' 15:33:39.509 [info] [gen_haskell][my_haskell] rpc erlang:node(ErlNull) (rpc_test@127.0.0.1)3> my_haskell:call(erlang, node, [1]). haskell_equals_very_yes 15:33:46.656 [info] [gen_haskell][my_haskell] rpc erlang:node(ErlString "\SOH") (rpc_test@127.0.0.1)4> my_java:call(erlang, node, [1]). {error,"java.lang.ClassCastException: com.ericsson.otp.erlang.OtpErlangString cannot be cast to com.ericsson.otp.erlang.OtpErlangList"} (rpc_test@127.0.0.1)5> q(). ok 15:33:57.068 [info] [gen_haskell][my_haskell] Sending `rex ! stop` from terminate (rpc_test@127.0.0.1)6> 15:33:57.074 [info] [gen_java][my_java] Sending `rex ! stop` from terminate
Fin