amazon s3 - spark error loading files from S3 wildcard -
i'm using pyspark shell , trying read data s3 using file wildcard feature of spark, i'm getting following error:
welcome ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ using python version 2.7.6 (default, jul 24 2015 16:07:07) sparkcontext available sc. >>> sc._jsc.hadoopconfiguration().set("fs.s3n.awsaccesskeyid", 'aws_access_key_id') >>> sc._jsc.hadoopconfiguration().set("fs.s3n.awssecretaccesskey", 'aws_secret_access_key') >>> sc.textfile("s3n://mybucket/path/files-*", use_unicode=false).count() 16/01/07 18:03:02 info memorystore: ensurefreespace(37645) called curmem=83944, maxmem=278019440 16/01/07 18:03:02 info memorystore: block broadcast_2 stored values in memory (estimated size 36.8 kb, free 265.0 mb) 16/01/07 18:03:02 info memorystore: ensurefreespace(5524) called curmem=121589, maxmem=278019440 16/01/07 18:03:02 info memorystore: block broadcast_2_piece0 stored bytes in memory (estimated size 5.4 kb, free 265.0 mb) 16/01/07 18:03:02 info blockmanagerinfo: added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 kb, free: 265.1 mb) 16/01/07 18:03:02 info blockmanagermaster: updated info of block broadcast_2_piece0 16/01/07 18:03:02 info sparkcontext: created broadcast 2 textfile @ nativemethodaccessorimpl.java:-2 16/01/07 18:03:03 warn rests3service: response '/path' - unexpected response code 404, expected 200 traceback (most recent call last): file "<stdin>", line 1, in <module> file "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count return self.mappartitions(lambda i: [sum(1 _ in i)]).sum() file "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum return self.mappartitions(lambda x: [sum(x)]).reduce(operator.add) file "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce vals = self.mappartitions(func).collect() file "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect bytesinjava = self._jrdd.collect().iterator() file "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ file "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling o65.collect. : org.apache.hadoop.fs.s3.s3exception: org.jets3t.service.s3serviceexception: failed sanitize xml document destined handler class org.jets3t.service.impl.rest.xmlresponsessaxparser$listbuckethandler @ org.apache.hadoop.fs.s3native.jets3tnativefilesystemstore.list(jets3tnativefilesystemstore.java:197) @ org.apache.hadoop.fs.s3native.jets3tnativefilesystemstore.list(jets3tnativefilesystemstore.java:166) @ sun.reflect.generatedmethodaccessor40.invoke(unknown source) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ org.apache.hadoop.io.retry.retryinvocationhandler.invokemethod(retryinvocationhandler.java:82) @ org.apache.hadoop.io.retry.retryinvocationhandler.invoke(retryinvocationhandler.java:59) @ org.apache.hadoop.fs.s3native.$proxy7.list(unknown source) @ org.apache.hadoop.fs.s3native.natives3filesystem.liststatus(natives3filesystem.java:375) @ org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:842) @ org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:902) @ org.apache.hadoop.fs.filesystem.globstatusinternal(filesystem.java:1032) @ org.apache.hadoop.fs.filesystem.globstatus(filesystem.java:987) @ org.apache.hadoop.mapred.fileinputformat.liststatus(fileinputformat.java:177) @ org.apache.hadoop.mapred.fileinputformat.getsplits(fileinputformat.java:208) @ org.apache.spark.rdd.hadooprdd.getpartitions(hadooprdd.scala:201) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:205) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:203) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:203) @ org.apache.spark.rdd.mappedrdd.getpartitions(mappedrdd.scala:28) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:205) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:203) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:203) @ org.apache.spark.api.python.pythonrdd.getpartitions(pythonrdd.scala:57) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:205) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:203) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:203) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1352) @ org.apache.spark.rdd.rdd.collect(rdd.scala:780) @ org.apache.spark.api.java.javarddlike$class.collect(javarddlike.scala:309) @ org.apache.spark.api.java.javardd.collect(javardd.scala:32) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:231) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:379) @ py4j.gateway.invoke(gateway.java:259) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:133) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:207) @ java.lang.thread.run(thread.java:745) caused by: org.jets3t.service.s3serviceexception: failed sanitize xml document destined handler class org.jets3t.service.impl.rest.xmlresponsessaxparser$listbuckethandler @ org.jets3t.service.impl.rest.xmlresponsessaxparser.sanitizexmldocument(xmlresponsessaxparser.java:179) @ org.jets3t.service.impl.rest.xmlresponsessaxparser.parselistbucketobjectsresponse(xmlresponsessaxparser.java:198) @ org.jets3t.service.impl.rest.httpclient.rests3service.listobjectsinternal(rests3service.java:1090) @ org.jets3t.service.impl.rest.httpclient.rests3service.listobjectschunkedimpl(rests3service.java:1056) @ org.jets3t.service.s3service.listobjectschunked(s3service.java:1328) @ org.apache.hadoop.fs.s3native.jets3tnativefilesystemstore.list(jets3tnativefilesystemstore.java:181) ... 44 more caused by: java.lang.outofmemoryerror: java heap space @ java.util.arrays.copyof(arrays.java:3332) @ java.lang.abstractstringbuilder.expandcapacity(abstractstringbuilder.java:137) @ java.lang.abstractstringbuilder.ensurecapacityinternal(abstractstringbuilder.java:121) @ java.lang.abstractstringbuilder.append(abstractstringbuilder.java:569) @ java.lang.stringbuffer.append(stringbuffer.java:369) @ org.jets3t.service.impl.rest.xmlresponsessaxparser.sanitizexmldocument(xmlresponsessaxparser.java:160) @ org.jets3t.service.impl.rest.xmlresponsessaxparser.parselistbucketobjectsresponse(xmlresponsessaxparser.java:198) @ org.jets3t.service.impl.rest.httpclient.rests3service.listobjectsinternal(rests3service.java:1090) @ org.jets3t.service.impl.rest.httpclient.rests3service.listobjectschunkedimpl(rests3service.java:1056) @ org.jets3t.service.s3service.listobjectschunked(s3service.java:1328) @ org.apache.hadoop.fs.s3native.jets3tnativefilesystemstore.list(jets3tnativefilesystemstore.java:181) @ org.apache.hadoop.fs.s3native.jets3tnativefilesystemstore.list(jets3tnativefilesystemstore.java:166) @ sun.reflect.generatedmethodaccessor40.invoke(unknown source) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:497) @ org.apache.hadoop.io.retry.retryinvocationhandler.invokemethod(retryinvocationhandler.java:82) @ org.apache.hadoop.io.retry.retryinvocationhandler.invoke(retryinvocationhandler.java:59) @ org.apache.hadoop.fs.s3native.$proxy7.list(unknown source) @ org.apache.hadoop.fs.s3native.natives3filesystem.liststatus(natives3filesystem.java:375) @ org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:842) @ org.apache.hadoop.fs.filesystem.liststatus(filesystem.java:902) @ org.apache.hadoop.fs.filesystem.globstatusinternal(filesystem.java:1032) @ org.apache.hadoop.fs.filesystem.globstatus(filesystem.java:987) @ org.apache.hadoop.mapred.fileinputformat.liststatus(fileinputformat.java:177) @ org.apache.hadoop.mapred.fileinputformat.getsplits(fileinputformat.java:208) @ org.apache.spark.rdd.hadooprdd.getpartitions(hadooprdd.scala:201) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:205) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:203) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.rdd.rdd.partitions(rdd.scala:203) @ org.apache.spark.rdd.mappedrdd.getpartitions(mappedrdd.scala:28) @ org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd.scala:205)
when try load single file (without using wildcard) code works. since need read 100k files, i'm wondering what's best way load of files rdd.
update
it appears me problem key prefix i'm using has on 300k files in s3 "directory" contains of files. files have date suffix.
s3://mybucket/path/files-2016-01-01-02-00 s3://mybucket/path/files-2016-01-01-02-01 s3://mybucket/path/files-2016-01-01-03-00 s3://mybucket/path/files-2016-01-01-03-01
i trying use wildcard select files date s3n://mybucket/path/files-2016-01-01-03-*
when turned on debug logging saw spark listing of files in s3 "directory" (s3://mybucket/path/
) rather files key prefix specified (s3://mybucket/path/files-2016-01-01-03-
). though trying read 2 files 300k files being listed , caused out of memory.
i've listed files s3 directly , made rdd containing exact file names , it's working far me.
raw_file_list = subprocess.popen("env aws_access_key_id="myid" aws_secret_access_key="mykey" aws s3 ls s3://mybucket/path/files-2016-01-01-02", shell=true, stdout=subprocess.pipe).stdout.read().strip().split('\n') s3_file_list = sc.parallelize(raw_file_list).map(lambda line: "s3n://mybucket/path/%s" % line.split()[3]).collect() rdd = sc.textfile(','.join(s3_file_list), use_unicode=false)
Comments
Post a Comment