gen_java: easy java for erlang

_












Introduction

  • I write Erlang at CHEF
  • I used to write Java

Analytics at Chef

Stream processing of all data that flows through

Chef Server.

Used on clusters of up to 40,000 nodes!

Sends alerts to various endpoints

when various things happen

  • Hipchat
  • SMTP
  • Webhooks
  • More Coming Soon!

Want more?

BUY MY CONFIGURATION MANAGEMENT INFRASTRUCTURE

CHEF.io | Analytics Documentation

COME TO CHEFCONF: 3/31 - 4/2 Santa Clara

Alaska Pipeline

Apache Storm Pipeline

It's a pipeline, Alaska has pipelines,

so we called it Alaska

You write rules Rule Documentation

rules 'PCI 2.3 – Confirm telnet port not available'
 rule on run_control
  when
    name = 'telnet not listening' and
    resource_type = 'port' and
    resource_name = '23' and
    status != 'success'
  then
    audit:error("PCI 2.3 - Encrypt all non-console \
                 administrative access such as \
                 browser/Web-based management tools.")
    notify("security-team@financialcorp.com",
           "{{run.node_name}} is listening for \
           connections on port 23/telnet!")
  end
end

notify(X) will use a different set of definitions for what those messages contain.

Parsing Rules

Alaska Rules, an ANTLR grammar for Java

Events processed by Apache Storm pipeline

Rule syntax based on a subset of Complex Event Processing (CEP)

More info on that:

Configuration Web Service

Validating Rules

Dave likes writing parsers, so he gave us

Erlaska Rules

Neotoma Parser

Neotoma is a packrat parser-generator for Erlang for Parsing Expression Grammars (PEGs).

The important thing being that it's different from how ANTLR does grammars

erlaska_rules only ever validated syntax, whereas alaska_rules is an actual compiler that generates code to evaluate in the pipeline

erlaska_rules.erl

erlaska_rules is a module generated by the neotoma project. Once we have that parser, validating rules from webmachine was as easy as:

%% inside malformed_request/2
case erlaska_rules:parse(Rule) of
    true ->
        {false, Req, State#state{rule=Rule}};
    {false, _Reason} ->
        {true, Req, State}
end;

Problem?

This worked fine at first, but every change to the grammar had to be duplicated. Well, it turns out that we never got that far. We never actually achieved 100% compatibility.

What If?

We could call the Java parser from Erlang?

We've already got the ANTLR grammar, which is the definitive truth for correctness of rules anyway. If we could use that, we cut our work in half.

Even though Dave loves parsers.

The easy way

We could have just made a java command line tool for parsing rules, but it just seemed like too much of a hack

Wait

I've run Java from Erlang before with Riak_JMX. If you have to do something twice, it's time to make it generic.

Let's Do More

But actually, I'm doing something new here. What I really want to do is send Java an rpc:call and have Erlang not really even care that Java is involved.

JInterface

It turns out we've had this for a while.

It understands the ideas of:

  • Nodes
  • EPMD
  • Erlang Datatypes
  • Process Messages

JInterface User Guide | JInterface Javadoc

No RPC, No Problem

Note: OTP source links will all be to the tag R16B03-1

I already knew that RPC calls were handled by a process called `rex`, so I started digging around the Erlang source for it

rpc.erl

%% In the source for rpc.erl
-define(NAME, rex).
do_call(Node, Request, Timeout) ->
  %% ...
  Result = gen_server:call({?NAME,Node}, Request, Timeout),

So, what's `Request` look like?

It's coming in to rpc:do_call, so let's look at rpc:call which calls it.

call(N,M,F,A,infinity) when node() ==:== N ->
    %% Optimize local call
    local_call(M,F,A);
call(N,M,F,A,infinity) ->
    do_call(N,
           {call,M,F,A,group_leader()},
           infinity);
call(N,M,F,A,Timeout) when is_integer(Timeout),
                           Timeout >= 0 ->
    do_call(N,
            {call,M,F,A,group_leader()},
            Timeout).

do_call

  • Some RPC magic we don't need to worry about
  • what we do care about is that it calls gen_server:call

rpc:do_call

There's some pretty nifty stuff in there about spawning monitors and trapping exits, but it's not really relevant to what we're doing here

Request

Request = {
  call        :: atom(),
  Module      :: atom(),
  Function    :: atom(),
  Arguments   :: [any()],
  GroupLeader :: pid()
}

But wait, there's more

That's not all Erlang would be sending to another node. Let's dig into the gen_server:call

gen_server:call

call(Name, Request, Timeout) ->
    case catch gen:call(Name, '$gen_call',
                        Request, Timeout) of
        {ok,Res} ->
            Res;
        {'EXIT',Reason} ->
            exit({Reason,
                  {?MODULE, call, [Name,
                                   Request,
                                   Timeout]}})
    end.

the rabbit hole goes deeper.

WARNING: rpc is in kernel, but gen_server is in stdlib if you're digging in source

gen:call

Source: gen:call

%% deep in gen:do_call, which is called by gen:call
erlang:send(Process,
      {Label, {self(), Mref}, Request}, %% <- THIS!
      [noconnect])

Jackpot! The second argument to erlang:send/3 is our message! The actual message being sent is a 3-tuple

So, here's the path

rpc:call(            Node,                    M,F,A,    T) ->
rpc:do_call(         Node,              {call,M,F,A,GL},T) ->
gen_server:call({rex,Node},             {call,M,F,A,GL},T) ->
gen:call(       {rex,Node}, '$gen_call',{call,M,F,A,GL},T) ->
gen:do_call(    {rex,Node}, '$gen_call',{call,M,F,A,GL},T) ->
erlang:send(    {rex,Node},{'$gen_call',
                                {self(), Mref},
                                        {call,M,F,A,GL}).
%%% ^^^ JACKPOT!

1st element: ID

'$gen_call'

2nd element: Return Address

{ From :: pid(),
  MRef :: ref() }

From pid could be waiting for a bunch of replies. MRef let's it know what it's a reply to

3rd element: RPC Request

Request from above

Request = {
  call        :: atom(),
  Module      :: atom(),
  Function    :: atom(),
  Arguments   :: [any()],
  GroupLeader :: pid()
}

Now we know

what Erlang sends to other erlang nodes for rpc:call

Knowing is half the battle!

Setting up the Java Side

JInterface gives us Node for free, so we can just set something up to listen for messages

public static void main(String[] stringArgs)
                                 throws Exception {
    String nodename = stringArgs[0];
    String cookie = stringArgs[1];
    OtpNode self = new OtpNode(nodename, cookie);
    OtpMbox rex = self.createMbox("rex");
    while(true) {
    // rex.receive is a blocking call,
    // so just hang out here until one shows up
        OtpErlangObject o = rex.receive();
        System.out.println("Rex received '"
                           + o.toString());
    }
}

The Simplest of Java nodes. Just opens up a `rex` mailbox and waits for messages. Any rpc:call to this node will just print it's content to stdout.

Deserialization in Java

This is where we start missing pattern matching. It takes about 50 lines of Java to parse out that 3-tuple that gen:do_call is sending over. And that's with Exception handling abstracted out

Source ErlangRemoteProcedureCallMessage.java

Validate Arity

OtpErlangTuple rexCall = (OtpErlangTuple)o;
int arity = rexCall.arity();
if (arity != 3) {
    throw new Exception(
       "Rex message has invalid arity. expected 3, got "
       + arity);
}

Validate gen_call as first element:

Remember the 1st element? '$gen_call'

OtpErlangAtom gen_call =
         (OtpErlangAtom)(rexCall.elementAt(0));
String gen_call_string = gen_call.atomValue();
if (!gen_call_string.equals("$gen_call")) {
    throw new Exception(
        "Rex message should start with '$gen_call': "
        + o.toString());
}

Validate second element: {Pid::pid, Ref::ref}

OtpErlangTuple fromTuple =
         (OtpErlangTuple)(rexCall.elementAt(1));
int fromArity = fromTuple.arity();
if (fromArity != 2) {
    throw new Exception(
        "Rex message's 'from' tuple should have 2 elements, has "
         + fromArity + ": " + o.toString());
}
this.fromPid = (OtpErlangPid)(fromTuple.elementAt(0));
this.fromRef = (OtpErlangRef)(fromTuple.elementAt(1));

Validate the call tuple:

{call::atom, Mod::atom, Fun::atom, List::list(), user:atom()}

OtpErlangTuple callTuple = (OtpErlangTuple)(rexCall.elementAt(2));
int callArity = callTuple.arity();
if (callArity != 5) {
    throw new ErlangRemoteException(this.fromPid, this.fromRef,
         "Rex message's 'call' tuple should have 5 elements, has "
         + callArity + ": " + o.toString());
}
OtpErlangAtom callAtom = (OtpErlangAtom)(callTuple.elementAt(0));
String callString = callAtom.atomValue();
if (!callString.equals("call")) {
    throw new ErlangRemoteException(this.fromPid, this.fromRef,
       "Rex message's call block should start with 'call', but it's : "
       + callString);
}

Validate M,F,A

try {
    this.mfa = new ErlangModFunArgs(
        (OtpErlangAtom)(callTuple.elementAt(1)),
        (OtpErlangAtom)(callTuple.elementAt(2)),
        (OtpErlangList)(callTuple.elementAt(3)));
    this.remoteGroupLeaderPid = (OtpErlangPid)(callTuple.elementAt(4));
} catch (Exception e) {
    throw new ErlangRemoteException(this.fromPid, this.fromRef, e);
}

Exception Handling: toErlangException

Source: ErlangRemoteException.java

turns exceptions into {error, "Message"}

public static OtpErlangObject toErlangException(Exception e) {
    OtpErlangObject[] elements = new OtpErlangObject[2];
    elements[0] = new OtpErlangAtom("error");
    elements[1] = new OtpErlangString(e.getMessage());
    return new OtpErlangTuple(elements);
}

Exception Handling: send

send knows just enough about erlang/rex to send an error message back to rpc:call

We forgot to look at that! Fortunately it's here in gen:do_call

It's waiting for a

{ref(), Reply}

So we send

public void send(OtpMbox mbox) {
    OtpErlangObject[] elements = new OtpErlangObject[2];
    elements[0] = this.fromRef;
    elements[1] = this.toErlangException();
    mbox.send(this.fromPid, new OtpErlangTuple(elements));
}

But, sometimes not.

If you noticed, we don't start using ErlangRemoteException until after we've read in the second tuple. It's not until then that we know enough about the sender to know where to send the reply. Before that, we just throw regular exceptions. We'll catch both types when we process incoming messages. If we don't know how to respond, we'll just dump the output to the console, which we'll teach the erlang side to monitor.

try/catch

Java incoming message processing

ErlangRemoteProcedureCallMessage msg = null;
try {
    msg = new ErlangRemoteProcedureCallMessage(rex, o);
} catch (ErlangRemoteException erlE) {
    erlE.send(rex);
} catch (Exception e) {
    System.out.println("Rex received '"
        + o.toString()
        + "' but didn't know how to process it. Exception: "
        + e.getMessage());
}

Back to the Erlang side

The gen_java module

  • It's a gen_server
  • Starts a jar of your choosing!
  • When you build that jar, include gen_java.jar

The gen_java project structure

  • src/main/java <- maven will build a jar with this
  • src/main/erlang <- rebar will use this

mcdlt.jpg

At least it's not McRib

Starting the gen_java server

  • Opens a port running your jar in the JVM

Basic Handshake

Fetch = fun() ->
    X = rpc:call(Nodename, erlang, node, [], 10000),
    Nodename = : = X
end,
case wait_until(Fetch, 20, 1000) of
    ok ->
        rpc:call(Nodename, erlang, link, [self()]),
        erlang:monitor_node(Nodename, true),
        init_callback( State#gen_java_state{ port = Port, pid = Pid});
    timeout ->
        {stop, timeout}
end

Mean-Girls-stop-trying-to-make-fetch-happen.gif

Figure 2: that's so fetch

Handshake: What just happened?

  • keeps rpc calling erlang:node/0 until it gets an answer
  • if it doesn't stop the server, otherwise
  • link the java node back to the server's process
  • monitor the java node
  • init_callback?

After we've started, there's a callback that lets you run some start up java code before we start accepting rpc:calls

Error logging

handle_info/2

handle_info({Port, {data, {_Type, Data}}},
            #gen_java_state {port = Port,
                             module = M } = State) ->
    lager:info("[gen_java][~p] ~s", [M, Data]),
    {noreply, State};

Now that we've got a port running this JVM anything that java System.out.printlns will end up in your erlang application's log

Recap

We're sending rpc:calls to the java node

we can send error messages back

  • console
  • rpc responses

So, what do we do with actual rpc calls?

The Easy Way : Hard Coded

There are somethings we just want every java node to be able to do:

Needed by our Handshake

  • erlang:node/0
  • erlang:link/1

POC Methods

  • erlang:abs/1 x2

Nice for JVM inspection

  • java:system_properties/0
  • java:system_env/0
  • java:input_args/0

WTF is the java module?!

I made it up. I made the erlang module up too. Java doesn't have these

Let's talk about how we map erlang MFAs

All Others

  • must be java methods of type public static final
  • must have all arguments and return types of classes provided by JInterface
  • since java reflection is a bit expensive, we cache the Method objects.

Initializing the RPC Method Cache

Map<ErlangFunctionCacheKey, Method> RPCCache =
    new HashMap<ErlangFunctionCacheKey, Method>();
RPCCache.put(
        new ErlangFunctionCacheKey(
                "erlang", "abs", OtpErlangDouble.class),
        Erlang.class.getMethod("abs", OtpErlangDouble.class));
RPCCache.put(
        new ErlangFunctionCacheKey(
                "erlang", "abs", OtpErlangLong.class),
        Erlang.class.getMethod("abs", OtpErlangLong.class));

last arg is variable list of classes

Erlang.java

dat java module

// wrapper for java.util.System.getProperties()
RPCCache.put(
        new ErlangFunctionCacheKey("java", "system_properties"),
        Java.class.getMethod("system_properties"));

RPCCache.put(
        new ErlangFunctionCacheKey("java", "system_env"),
        Java.class.getMethod("system_env"));

RPCCache.put(
        new ErlangFunctionCacheKey("java", "input_arguments"),
        Java.class.getMethod("input_arguments"));

ACHEIVEMENT UNLOCKED: Java dot java

Java.java

public static OtpErlangList system_properties() {
    List<OtpErlangTuple> l = new ArrayList<OtpErlangTuple>();
    Iterator<Map.Entry<Object, Object>> it =
        System.getProperties().entrySet().iterator();
    while(it.hasNext()) {
        Map.Entry<Object, Object> i = it.next();
        OtpErlangObject[] elems = new OtpErlangObject[2];
        elems[0] = new OtpErlangAtom(i.getKey().toString());
        elems[1] = new OtpErlangBinary(
                         i.getValue().toString().getBytes());
        OtpErlangTuple t = new OtpErlangTuple(elems);
        l.add(t);
    }
    return new OtpErlangList(l.toArray(new OtpErlangObject[0]));
}

java:system_properties()

(erlang@127.0.0.1)1> net_adm:ping('java@127.0.0.1').
pong
(erlang@127.0.0.1)2> rpc:call('java@127.0.0.1', java, system_properties, []).
[{'java.runtime.name',<<"Java(TM) SE Runtime Environment">>},
 {'sun.boot.library.path',<<"/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre/lib">>},
 {'java.vm.version',<<"24.71-b01">>},
 {gopherProxySet,<<"false">>},
 {'java.vm.vendor',<<"Oracle Corporation">>},
 {'java.vendor.url',<<"http://java.oracle.com/">>},
 {'path.separator',<<":">>},
 {'java.vm.name',<<"Java HotSpot(TM) 64-Bit Server VM">>},
 {'file.encoding.pkg',<<"sun.io">>},
 {'user.country',<<"US">>},
 {'sun.java.launcher',<<"SUN_STANDARD">>},
 {'sun.os.patch.level',<<"unknown">>},
 {'java.vm.specification.name',<<"Java Virtual Machine Specification">>},
  {'java.runtime.version',<<"1.7.0_71-b14">>},
 {'java.awt.graphicsenv',<<"sun.awt.CGraphicsEnvironment">>},
 {'java.endorsed.dirs',<<"/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre/lib/endorsed">>},
 {'os.arch',<<"x86_64">>},
 {'java.io.tmpdir',<<"/var/folders/hl/zf_j1bvs7_b18rj7bbsm35p00000gp/T/">>},
 {'line.separator',<<"\n">>},
 {'java.vm.specification.vendor',<<"Oracle Corporation">>},
 {'os.name',<<"Mac OS X">>},
 {'sun.jnu.encoding',<<"UTF-8">>},
 {'java.library.path',<<"/System/Library/Java/Extensions:/usr/lib/java:.">>},
 {'java.specification.name',<<"Java Platform API Specification">>},
 {'java.class.version',<<"51.0">>},
 {'sun.management.compiler',<<"HotSpot 64-Bit Tiered Compilers">>},
 {'os.version',<<"10.10.2">>},
 {'http.nonProxyHosts',<<"local|*.local|169.254/16|*.169.254/16">>},
 {'user.timezone',<<>>},
 {'java.awt.printerjob',<<"sun.lwawt.macosx.CPrinterJob">>},
 {'file.encoding',<<"UTF-8">>},
 {'java.specification.version',<<"1.7">>},
 {'java.class.path',<<"target/gen_java-0.1.2-SNAPSHOT-jar-with-dependencies.jar">>},
 {'java.vm.specification.version',<<"1.7">>},
 {'sun.java.command',<<"com.devivo.gen_java.ErlangServer java@127.0.0.1 cookie 10">>},
 {'java.home',<<"/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre">>},
 {'sun.arch.data.model',<<"64">>},
 {'user.language',<<"en">>},
 {'java.specification.vendor',<<"Oracle Corporation">>},
 {'awt.toolkit',<<"sun.lwawt.macosx.LWCToolkit">>},
 {'java.vm.info',<<"mixed mode">>},
 {'java.version',<<"1.7.0_71">>},
 {'java.vendor',<<"Oracle Corporation">>},
 {'file.separator',<<"/">>},
 {'java.vendor.url.bug',<<"http://bugreport.sun.com/bugreport/">>},
 {'sun.io.unicode.encoding',<<"UnicodeBig">>},
 {'sun.cpu.endian',<<"little">>},
 {socksNonProxyHosts,<<"local|*.local|169.254/16|*.169.254/16">>},
 {'ftp.nonProxyHosts',<<"local|*.local|169.254/16|*.169.254/16">>},
 {'sun.cpu.isalist',<<>>}]

What about your own methods?

Module: Full Java Class Name

Function: Java Method Name

Args: ARGS!

Caching?

check the cache

if(RPCCache.containsKey(msg.getMFA().getKey())) {
    Method m = RPCCache.get(msg.getMFA().getKey());
    msg.setMethod(m);
    pool.execute(msg);
} else {
    //// This means it's not in the cache, we should
    //// try and find it and add it.
    Method m = find(msg.getMFA().getKey());
    if (m != null) {
        RPCCache.put(msg.getMFA().getKey(), m);
        msg.setMethod(m);
        pool.execute(msg);
    } else {
        System.out.println("Bad RPC: " +
            msg.getMFA().getKey().toString());
        //// we couldn't add it, be nice and send a badrpc error back
        msg.send(msg.toErlangBadRPC());
    }
}

msg.toErlangBadRPC()

toErlangBadRPC()

% Bad RPC calls look like this:
{badrpc,{'EXIT',{undef,[{Module,Fun,[],[]},
                {rpc,'-handle_call_call/6-fun-0-',5,
                     [{file,"rpc.erl"},{line,205}]}]}}}

So we construct that tuple as a repsonse and send it

Caching Payoff!

Reflection is only done once per method.

We're aiming for the pool, right?

pool.execute(msg);

We went ahead and added some thread pooling on the java side.

Otherwise all the processing happening in once place. what if you asked it to do hard things?

pool.execute() is where we package up the method's return value and send it back to Erlang.

public void run() {
    OtpErlangObject result = new OtpErlangAtom("null");
    try {
        result = (OtpErlangObject)
            this.method.invoke(null, getMFA().getArgs().elements());
    } catch (Exception e) {
        //// This could "technically" throw a InvocationTargetException
        //// or an IllegalAccessException. We'll write defensive code
        //// for that eventually
        System.out.println(e.getClass().getName() + " : " + e.getMessage());
        result = error(e.getClass().getName() + " : " + e.getMessage());
    }
    this.send(result);
}

Wrapping Responses

public void send(OtpErlangObject resp) {
    this.rex.send(this.fromPid, wrapResponse(resp));
}

public OtpErlangTuple wrapResponse(OtpErlangObject resp) {
    OtpErlangObject[] elements = new OtpErlangObject[2];
    elements[0] = this.fromRef;
    elements[1] = resp;
    return new OtpErlangTuple(elements);
}

this.send makes sure to send it to the right place

wrapResponse makes sure to include that ref() we need for RPC

Erlang Developer Experience

You might remember that I'm kind of a user experience nut

Cuttlefish

Your Java Module

-module(my_java).

-compile({parse_transform, gen_java_parse_transform}).

Your sys.config

[{gen_java, [
     {modules, [
         {my_java, [
             {jar, "/path/to/my.jar"},
             {thread_count, 10}
                        ]}
               ]}
            ]}
].

Your Supervisor

start it with my_java:start_link/0 or

{my_java,
    {my_java, start_link, []},
    permanent, 5000, worker, [my_java]},

start_link/0?! Parse Transform

wrappers for gen_java functions

17 = my_java:call(erlang, abs, [-17]).
<<"your heart's desire">> =
     my_java:call('com.my.package','myMethod',[]).

gen_java_parse_transform.erl

5 Functions for FREE

This whole file just looks for a module's name, and subs it in to 5 functions

-export([start_link/0,start/0,call/3,call/4,stop/0]).

stop() ->
    gen_java:stop(my_java).

call(Module, Function, Args, Timeout) ->
    gen_java:call(my_java, Module, Function, Args, Timeout).

call(Module, Function, Args) ->
    gen_java:call(my_java, Module, Function, Args).

start() ->
    gen_java:start(my_java).

start_link() ->
    gen_java:start_link(my_java).

init callback

Remember that? put it here, it'll get called right after the handshake

-spec init(atom()) -> ok.
init(Nodename) ->
    SomeState = {some, thing, maybe_a_file_path},
    rpc:call(Nodename,
             'com.yourcompany.package',
             'init', [SomeState]).

Adding convenience

-spec my_method(binary()) -> binary() | gen_java:badrpc().
my_method(Binary) ->
    call('com.my.package','myMethod',[Binary]).

Then using java in your app is as easy as

my_java:my_method(Binary).

Bringing it Back to CHEF Analytics

erlaska_rules is out!

alaska_rules.jar is in!

sys.config

[{gen_java, [
     {modules, [
         {alaska_rules, [
             {jar, "priv/alaska_rules.jar"},
             {thread_count, 10}
                        ]}
               ]}
            ]}
].

alaska_rules.erl

-module(alaska_rules).

-compile({parse_transform, gen_java_parse_transform}).

-export([valid_rule/1, valid_rule_group/1, init/1]).

-spec valid_rule(binary()) ->
    true | {error, string()} | gen_java:badrpc().
valid_rule(Bin) ->
    call('com.chef.analytics.rules.erlang.RuleValidator',
         'validRule', [Bin]).

-spec valid_rule_group(binary()) ->
    true | {error, string()} | gen_java:badrpc().
valid_rule_group(Bin) ->
    call('com.chef.analytics.rules.erlang.RuleValidator',
    'validRuleGroup', [Bin]).

What do those java methods look like?

public static OtpErlangObject validRule(OtpErlangBinary ruleBin) {
    try {
        String ruleText = new String(ruleBin.binaryValue());
        Rule r = compiler.compile(ruleText);
        return new OtpErlangAtom(true);
    } catch (Exception e) {
        return ErlangRemoteException.toErlangException(e);
    }
}

public static OtpErlangObject validRuleGroup(OtpErlangBinary ruleGrpBin) {
    try {
        String ruleGrpText = new String(ruleGrpBin.binaryValue());
        RuleGroup rg = compiler.compileGroup(ruleGrpText);
        return new OtpErlangAtom(true);
    } catch (Exception e) {
        return ErlangRemoteException.toErlangException(e);
    }
}

init/1

We have some JSON schemas that alaksa_rules.jar uses for validation of attributes.

init/1 reads them in as a list of binaries and then sends them over to the java node

init(Nodename) ->
    Dir = schema_dir(),
    JSONSchemas = filelib:wildcard(filename:join([Dir, "*.json"])),
    Schemas = [begin
                   {ok, Bin} = file:read_file(Filename),
                   {list_to_atom(filename:basename(Filename)), Bin}
               end || Filename <- JSONSchemas],
    rpc:call(Nodename,
             'com.chef.analytics.rules.erlang.RuleValidator',
             'setSchemas', [Schemas]),
    ok.

Bringing it back to WebMachine

case alaska_rules:valid_rule_group(
             nc_obj_rule:getval(rule, Rule)) of
    true ->
        lager:debug("malformed_request: rule syntax good"),
        {false, Req, State};
    {error, Msg} ->
        lager:debug("Invalid rule syntax: ~s", [Msg]),
        mf_return(Msg, [], Req, State);
    {badrpc, nodedown} ->
        lager:error("Alaska Rules node down, no validation possible"),
        NewReq = req_helper([
            {set_resp_header, ["content-type", "application/json"]},
            {set_resp_body, [jiffy:encode(
                {[{error, <<"server side validation error">>}]})]}
        ], Req),
        {{halt, 500}, NewReq, State}
end.

Wrapping Up

All in all, this is just a wrapper for the hard stuff Erlang gave us for free. But what if they didn't?

Erlang Haskell Interface

haskell.png

Introducing Erlang Haskell Interface 0.2 github source

Erlang gives you zero Haskell for free

But somebody did: hackage erlang-0.1

What I got:

Erlang Types in Haskell

data ErlType = ErlNull
             | ErlInt Int
             | ErlBigInt Integer
             | ErlString String
             | ErlAtom String
             | ErlBinary [Word8]
             | ErlList [ErlType]
             | ErlTuple [ErlType]
             | ErlPid ErlType Int Int Int     -- node id serial creation
             | ErlPort ErlType Int Int        -- node id creation
             | ErlRef ErlType Int Int         -- node id creation
             | ErlNewRef ErlType Int [Word8]  -- node creation id
             deriving (Eq, Show)

Packing functions

putC = putWord8 . fromIntegral
putn = putWord16be . fromIntegral
putN = putWord32be . fromIntegral
puta = putByteString . B.pack
putA = putByteString . C.pack

getC = liftM fromIntegral getWord8
getn = liftM fromIntegral getWord16be
getN = liftM fromIntegral getWord32be
geta = liftM B.unpack . getByteString
getA = liftM C.unpack . getByteString

Half a Protocol

Looks like erlang-0.1 knew how to connect to an Erlang node from Haskell

It wanted it one way, but I wanted the other

Getting the old one working

nano-md5 dependency didn't work anymore, so replaced with PureMD5

Existing Documentation wasn't great, but it might have been me

Spinning up an Erlang node in Haskell

start

start nodename = do
    setupLoggers DEBUG
    infoM "Test" $ "Starting Node: " ++ nodename
    self <- createSelf nodename
    mbox <- createMBox self
    debugM "Test" $ "mbox: " ++ (show mbox)
    forever $ do
    rex_mbox <- createNamedMBox "rex" self
    forkIO $ rex nodename rex_mbox
    return ()

createSelf: Creating the Haskell Node

Processes.hs

createSelf          :: String -> IO Self
createSelf nodename = do
    inbox <- newEmptyMVar
    forkIO $ serve nodename inbox
    forkIO $ self nodename inbox
    node <- return .  Self $ putMVar inbox
    nk_mbox <- createNamedMBox "net_kernel" node
    forkIO $ net_kernel nk_mbox
    return node

self is the thing that routes those messages

serve

serve is the function that connects to epmd, opens up a listener and then puts messages in a mbox

Learning EPMD

Reserving a port

EMPD_ALIVE2_REQ

Open a socked with this request and keep it open… forever.

Here's the message EPMD expects

Bytes Content
1 120
2 Port to reserve
1 77 (means normal erlang node)
1 Protocol (0 = tcp/ipv4)
2 Highest version (5 = R6B and higher)
2 Lowest version (5 = R6B and higher)
2 Length in bytes of nodename field
X Nodename, X = ^^
2 Length of Extras, we used 0
Y Extras, length ^^, but we sent none

What's that look like?

EPMD_ALIVE2_REQ.png

Figure 4: Wiretap of ALIVE2_REQ

EPMD_ALIVE2_RESP.png

Figure 5: Bytes of ALIVE2_RESP

Haskell sends a EPMD_ALIVE2_REQ

epmdAlive2Req :: String -> Int -> IO ()
epmdAlive2Req node port = withEpmd $ \hdl -> do
    let msg = runPut $ tag 'x' >>
                       putn port >>
                       putC 77 >> -- node type
                       putC 0 >>  -- protocol
                       putn erlangVersion >>
                       putn erlangVersion >>
                       putn (length node) >>
                       putA node >>
                       putn 0 -- "Extra" length, 0 for none
    let len = fromIntegral $ B.length msg
    let out = runPut $ putn len >> putLazyByteString msg
    forever $ do
    B.hPut hdl out
    hFlush hdl
    B.hGetContents hdl
    return ()

See that forever call. just hang out letting EPMD know you still love it.

TIL: You can run `empd -debug` to see what's coming across the wire through EPMD

The Distribution Handshake

Handshake Documentation

ALIVE2_REQ isn't even a quarter of the handshake.

We also have to do a back and forth over the port we're actually listening on

send_name            ------>            recv_name

recv_status          <------          send_status

send_status          ------>          recv_status

recv_challenge       <------       send_challenge

send_challenge_reply ------> recv_challenge_reply

recv_challege_ack    <------   send_challenge_ack

SEND_NAME.png

Figure 6: Here's an example of SEND_NAME

Let's gloss over this. If you want to see it, I did it here: Network.hs

the serve function: listening for erlang communication

Opens a socket on port X

Does the ALIVE2_REQ with port X to EPMD

Does the Distributed Erlang Handshake with the ErlNode

Opens up two way communication erl <-> hs

Routes any received messages to self via ErlDispatch

serve

serve :: String -> MVar ErlMessage -> IO ()
serve nodename outbox = S.withSocketsDo $
    do
        sock <- S.socket (S.addrFamily serveraddr) S.Stream S.defaultProtocol
        S.bindSocket sock (S.addrAddress serveraddr)
        port <- S.socketPort sock
        forkIO $ epmdAlive2Req nodename $ read $ show port
        S.listen sock 5
         -- Create a lock to use for synchronizing access to the handler
        lock <- newMVar ()
        -- Loop forever waiting for connections.  Ctrl-C to abort.
        procRequests lock sock

procRequests: processing incoming socket connections from Erlang

procRequests :: MVar () -> S.Socket -> IO ()
procRequests lock mastersock =
    do (connsock, clientaddr) <- S.accept mastersock
       handleLog lock clientaddr $
          B.pack "Foreign.Erlang.Server: client connnected"
       forkIO $ procMessages lock connsock clientaddr
       procRequests lock mastersock

procMessages: processing messages from that socket

procMessages :: MVar () -> S.Socket -> S.SockAddr -> IO ()
procMessages lock connsock clientaddr =
    do connhdl <- S.socketToHandle connsock ReadWriteMode
       hSetBuffering connhdl NoBuffering
       (to, send, recv) <- erlConnectS connhdl nodename
       mvar <- newEmptyMVar
       forkIO $ nodeSend mvar send
       forkIO $ nodeRecv mvar recv outbox
       let node = putMVar mvar
       putMVar outbox $ ConnectedNode to node

nodeRecv: routing incoming messages

{-
A `nodeRecv` thread is responsible for communication from an Erlang
process.  It receives messages from the network and dispatches them as
appropriate.
-}
nodeRecv mvar recv outbox = loop
  where
    loop = do
        (mctl, mmsg) <- recv
        case mctl of
            -- Nothing is a keepalive.  All we want to do is echo it.
            Nothing  -> putMVar mvar (Nothing, Nothing)
            -- A real message goes to self to be dispatched.
            Just ctl -> putMVar outbox $
                ErlDispatch ctl (fromJust mmsg)
        loop

Funky Middle Syntax

Protocol between connected nodes

Turns out we need to figure out how to interpret Erlangy packets coming in now

Here's the distilled version of what they could be:

{1, FromPid, ToPid}                       %% LINK
{2, Cookie, ToPid}                        %% SEND
{3, FromPid, ToPid, Reason}               %% EXIT
{4, FromPid, ToPid}                       %% UNLINK
{5}                                       %% NODE_LINK
{6, FromPid, Cookie, ToName}              %% REG_SEND
{7, FromPid, ToPid}                       %% GROUP_LEADER
{8, FromPid, ToPid, Reason}               %% EXIT2
{12, Cookie, ToPid, TraceToken}           %% SEND_TT
{16, FromPid, Cookie, ToName, TraceToken} %% REG_SEND_TT
{18, FromPid, ToPid, TraceToken, Reason}  %% EXIT2_TT
{19, FromPid, ToProc, Ref}                %% MONITOR_P
{20, FromPid, ToProc, Ref}                %% DEMONITOR_P
{21, FromProc, ToPid, Ref, Reason}        %% MONITOR_P_EXIT

Here's how the self process is handling them

Full Function: Processes.hs

I left a bunch of clauses off this slide

self                :: String -> MVar ErlMessage -> IO ()
self nodename inbox = loop 1 [] [] []
  where
    loop id registered mboxes nodes = do
        msg <- takeMVar inbox
        debugM "Foreign.Erlang.Processes"
            $ "loop msg recv'd: " ++ (show msg)
        case msg of
          -- other cases omited for clarity
          ErlDispatch ctl msg -> do
            case ctl of
              ErlTuple [ErlInt 2, _, pid] ->
                maybe (return ()) ($ msg) $ lookup pid mboxes
              ErlTuple [ErlInt 6, from, _, pid] ->
                maybe (return ())
                   ($ (ErlTuple [from, msg])) $ lookup pid registered
              _ -> return ()
            loop id registered mboxes nodes
          -- This clause is for when Erlang has connected to this node
          -- we're just telling this node to add it to the connected nodes.
          ConnectedNode to node -> do
            case lookup to nodes of
                Just n ->
                  loop id registered mboxes nodes
                Nothing ->
                  loop id registered mboxes ((to, node):nodes)
          ErlStop -> return ()

net_kernel

net_kernel mbox

-- This is the loop that receives erlang messages to the net_kernel
-- module. Without it, you can't ping this node
net_kernel mbox = do
    (ErlTuple [
        from@(ErlPid (ErlAtom node) a b c),
        msg@(ErlTuple [_,ErlTuple [_,ref],_])
        ]) <- mboxRecv mbox
    mboxSend mbox node (Left from) $ ErlTuple [ref, ErlAtom "yes"]
    net_kernel mbox

An rpc:call received by Haskell

erlang: rpc:call('haskell@127.0.0.1', 'mod', 'fun', ['args']).
ErlPid (ErlAtom "erlang@127.0.0.1") 38 0 2
ErlTuple [ErlAtom "$gen_call",
          ErlTuple [ErlPid (ErlAtom "erlang@127.0.0.1") 38 0 2,
                    ErlNewRef (ErlAtom "erlang@127.0.0.1") 2 [0,0,0,191,0,0,0,0,0,0,0,0]],
          ErlTuple [ErlAtom "call",
                    ErlAtom "mod",
                    ErlAtom "fun",
                    ErlList [ErlAtom "args"],
                    ErlPid (ErlAtom "erlang@127.0.0.1") 31 0 2]]

This should look familiar!

The Rex mbox handler

Test.hs:rex mbox

rex nodename mbox = do
    (ErlTuple [
        from@(ErlPid (ErlAtom node) a b c),
        msg@(ErlTuple [_,ErlTuple [_,ref],ErlTuple [
                call,
                ErlAtom modName,
                ErlAtom funName,
                args,
                _ -- GroupLeader
            ])
        ]) <- mboxRecv mbox
    debugM "Test" $ "rpc " ++ modName ++ ":" ++ funName ++ "(" ++ (show args) ++ ")"
    case (modName, funName, args) of
      ("erlang", "node", ErlNull) ->
        mboxSend mbox node (Left from) $
            ErlTuple [ref, ErlAtom (nodename ++ "@127.0.0.1") ]
      otherwise ->
        mboxSend mbox node (Left from) $
            ErlTuple [ref, ErlAtom "haskell_equals_very_yes"]
    rex nodename mbox

Future Work

Notice I'm just returning 'haskell_equals_very_yes' for everything. I'm just excited that's working since it's my first stab at Haskell. Plenty of future work here.

gen_haskell?

VERY YES!

  • Use port commands to start GHC instead of java!
  • Catch the output
  • Mostly cut and paste from gen_java

It works!

RPC Test app

➜ rpc_test ./_rel/rpc_test/bin/rpc_test console
Exec: /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/erts-5.10.3/bin/erlexec -boot /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/rpc_test -env ERL_LIBS /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/lib -config /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/sys.config -args_file /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test/releases/0.0.1/vm.args -- console
Root: /Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test
/Users/joe/dev/joedevivo/rpc_test/_rel/rpc_test
Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

15:33:17.970 [info] Application lager started on node 'rpc_test@127.0.0.1'
15:33:17.970 [info] Application gen_java started on node 'rpc_test@127.0.0.1'
15:33:17.970 [info] Application gen_haskell started on node 'rpc_test@127.0.0.1'
15:33:17.973 [info] [gen_java][my_java] starting (pid: <0.89.0>)
15:33:17.975 [info] [gen_java][my_java] cmd: "java -server -cp priv/gen_java.jar com.devivo.gen_java.ErlangServer gen_java_my_java_rpc_test@127.0.0.1 RVDTHTVOBAMGCPHVSWZW 10"
15:33:17.981 [info] [gen_java][my_java] startup: "7399"
15:33:18.066 [info] [gen_java][my_java] startup: "Starting OTP Node 'gen_java_my_java_rpc_test@127.0.0.1' with cookie RVDTHTVOBAMGCPHVSWZW"
15:33:18.099 [info] [gen_java][my_java] startup: "Started node: gen_java_my_java_rpc_test@127.0.0.1"
15:33:18.099 [info] [gen_java][my_java] OS Pid: "7399"
15:33:18.119 [info] [gen_java][my_java] Thread Pool Size : 10
15:33:18.123 [info] [gen_hasekll][my_haskell] starting (pid: <0.95.0>)
15:33:18.123 [info] [gen_haskell][my_haskell] cmd: "runghc Node gen_haskell_my_haskell_rpc_test"
15:33:18.128 [info] [gen_haskell][my_haskell] startup: "7402"
15:33:18.600 [info] [gen_haskell][my_haskell] startup: "Starting Node: gen_haskell_my_haskell_rpc_test"
15:33:18.600 [info] [gen_haskell][my_haskell] startup: "mbox: MBox ErlPid (ErlAtom \"gen_haskell_my_haskell_rpc_test\") 1 0 1 MVar *self*"
15:33:18.600 [info] [gen_haskell][my_haskell] OS Pid: "7402"
15:33:18.601 [info] [gen_haskell][my_haskell] rpc:call('gen_haskell_my_haskell_rpc_test@127.0.0.1', erlang, node, []) = {badrpc,nodedown}
15:33:19.606 [info] [gen_haskell][my_haskell] rpc:call('gen_haskell_my_haskell_rpc_test@127.0.0.1', erlang, node, []) = 'gen_haskell_my_haskell_rpc_test@127.0.0.1'
15:33:19.607 [info] [gen_haskell][my_haskell] rpc erlang:node(ErlNull)
15:33:19.607 [info] [gen_haskell][my_haskell] rpc erlang:link(ErlList [ErlPid (ErlAtom "rpc_test@127.0.0.1") 95 0 3])
15:33:19.607 [info] Application rpc_test started on node 'rpc_test@127.0.0.1'
Eshell V5.10.3  (abort with ^G)
(rpc_test@127.0.0.1)1> my_java:call(erlang, node, []).
'gen_java_my_java_rpc_test@127.0.0.1'
(rpc_test@127.0.0.1)2> my_haskell:call(erlang, node, []).
'gen_haskell_my_haskell_rpc_test@127.0.0.1'
15:33:39.509 [info] [gen_haskell][my_haskell] rpc erlang:node(ErlNull)
(rpc_test@127.0.0.1)3> my_haskell:call(erlang, node, [1]).
haskell_equals_very_yes
15:33:46.656 [info] [gen_haskell][my_haskell] rpc erlang:node(ErlString "\SOH")
(rpc_test@127.0.0.1)4> my_java:call(erlang, node, [1]).
{error,"java.lang.ClassCastException: com.ericsson.otp.erlang.OtpErlangString cannot be cast to com.ericsson.otp.erlang.OtpErlangList"}
(rpc_test@127.0.0.1)5> q().
ok
15:33:57.068 [info] [gen_haskell][my_haskell] Sending `rex ! stop` from terminate
(rpc_test@127.0.0.1)6> 15:33:57.074 [info] [gen_java][my_java] Sending `rex ! stop` from terminate

Fin

erlang-haskell.gif

Author: Joe DeVivo

Created: 2015-04-03 Fri 09:15

Emacs 24.4.1 (Org mode 8.2.10)

Validate