python - How to use elasticsearch.helpers.streaming_bulk -
can advice how use function elasticsearch.helpers.streaming_bulk instead elasticsearch.helpers.bulk indexing data elasticsearch.
if change streaming_bulk instead of bulk, nothing gets indexed, guess needs used in different form.
code below creates index, type , index data csv file in chunks of 500 elemens elasticsearch. working wandering possible increse prerformance. that's why want try out streaming_bulk function.
currently need 10 minutes index 1 million rows csv document of 200mb. use 2 machines, centos 6.6 8 cpu-s, x86_64, cpu mhz: 2499.902, mem: 15.574g total. not sure can go faster.
es = elasticsearch.elasticsearch([{'host': 'uxmachine-test', 'port': 9200}]) index_name = 'new_index' type_name = 'new_type' mapping = json.loads(open(config["index_mapping"]).read()) #read mapping json file es.indices.create(index_name) es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping) open(file_to_index, 'rb') csvfile: reader = csv.reader(csvfile) #read documents indexing csv file, more million rows content = {"_index": index_name, "_type": type_name} batch_chunks = [] iterator = 0 row in reader: var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment) id_increment = id_increment + 1 #var = transform_row_for_indexing(row,fields, index_name, type_name) batch_chunks.append(var) if iterator % 500 == 0: helpers.bulk(es,batch_chunks) del batch_chunks[:] print "ispucalo batch" iterator = iterator + 1 # indexing of last batch_chunk if len(batch_chunks) != 0: helpers.bulk(es,batch_chunks)
so streaming bulk returns interator. means nothing happen until start iterating on it. code 'bulk' function looks this:
success, failed = 0, 0 # list of errors collected not stats_only errors = [] ok, item in streaming_bulk(client, actions, **kwargs): # go through request-reponse pairs , detect failures if not ok: if not stats_only: errors.append(item) failed += 1 else: success += 1 return success, failed if stats_only else errors
so calling streaming_bulk(client, actions, **kwargs) won't anything. it's not until iterate on done in loop indexing starts happen.
so in code. welcome change 'bulk' 'streaming_bulk' need iterate on results of streaming bulk in order have indexed.
Comments
Post a Comment