clojure - Persisting State from a DRPC Spout in Trident -
i'm experimenting storm , trident project, , i'm using clojure , marceline so. i'm trying expand wordcount example given on the marceline page, such sentence spout comes drpc call rather local spout. i'm having problems think stem fact drpc stream needs have result return client, drpc call return null, , update persisted data.
(defn build-topology ([] (let [trident-topology (tridenttopology.)] (let [ ;; ### 2 alternatives here ### ;collect-stream (t/new-stream trident-topology "words" (mk-fixed-batch-spout 3)) collect-stream (t/drpc-stream trident-topology "words") ] (-> collect-stream (t/group-by ["args"]) (t/persistent-aggregate (memorymapstate$factory.) ["args"] count-words ["count"])) (.build trident-topology)))))
there 2 alternatives in code - 1 using fixed batch spout loads no problem, when try load code using drpc stream instead, error:
invalidtopologyexception(msg:component: [b-2] subscribes non-existent component [$mastercoord-bg0])
i believe error comes fact drpc stream must trying subscribe output in order have return client - persistent-aggregate
doesn't offer such outputs subscribe to.
so how can set topology drpc stream leads persisted data being updated?
minor update: looks might not possible :( https://issues.apache.org/jira/browse/storm-38
Comments
Post a Comment