Unable to pull events in spark streaming application from flume -
i trying integrate flume spark streaming application. running spark scala sample flumepollingeventcount pull events flume. running spark job on single machine.
i have following configuration.
avro source -> memory channel -> spark sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.1.36 a1.sources.r1.port = 41414 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.sparksink a1.sinks.k1.hostname = 192.168.1.36 a1.sinks.k1.port = 41415 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactioncapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
i sending events @ 41414 using avro client , spark streaming not able receive event.
i following error when start spark example
warn flumebatchfetcher: did not receive events flume agent due error on flume agent: begin() called when transaction open!
at flume console following exception; 2016-01-07 19:56:51,344 (spark sink processor thread - 10) [warn - org.apache.spark.streaming.flume.sink.logging$class.logwarning(logging.scala:59)] spark unable process events. transaction being rolled back. 2016-01-07 19:56:51,344 (new i/o worker #5) [warn - org.apache.spark.streaming.flume.sink.logging$class.logwarning(logging.scala:59)] received error batch - no events received channel! 2016-01-07 19:56:51,353 (new i/o worker #5) [warn - org.apache.spark.streaming.flume.sink.logging$class.logwarning(logging.scala:59)] received error batch - no events received channel! 2016-01-07 19:56:51,355 (spark sink processor thread - 9) [warn - org.apache.spark.streaming.flume.sink.logging$class.logwarning(logging.scala:80)] error while processing transaction. java.lang.illegalstateexception: begin() called when transaction open! @ com.google.common.base.preconditions.checkstate(preconditions.java:145) @ org.apache.flume.channel.basictransactionsemantics.begin(basictransactionsemantics.java:131) @ org.apache.spark.streaming.flume.sink.transactionprocessor$$anonfun$populateevents$1.apply(transactionprocessor.scala:114) @ org.apache.spark.streaming.flume.sink.transactionprocessor$$anonfun$populateevents$1.apply(transactionprocessor.scala:113) @ scala.option.foreach(option.scala:236) @ org.apache.spark.streaming.flume.sink.transactionprocessor.populateevents(transactionprocessor.scala:113) @ org.apache.spark.streaming.flume.sink.transactionprocessor.call(transactionprocessor.scala:243) @ org.apache.spark.streaming.flume.sink.transactionprocessor.call(transactionprocessor.scala:43)
can 1 give me clue?
i ran same problem when used spark-flume-approach2 included different version of spark-streaming-flume_${spark.scala.version}
in flume classpath. if include exact versions specified in above link, should not see error again.
Comments
Post a Comment