python - How to use elasticsearch.helpers.streaming_bulk -


can advice how use function elasticsearch.helpers.streaming_bulk instead elasticsearch.helpers.bulk indexing data elasticsearch.

if change streaming_bulk instead of bulk, nothing gets indexed, guess needs used in different form.

code below creates index, type , index data csv file in chunks of 500 elemens elasticsearch. working wandering possible increse prerformance. that's why want try out streaming_bulk function.

currently need 10 minutes index 1 million rows csv document of 200mb. use 2 machines, centos 6.6 8 cpu-s, x86_64, cpu mhz: 2499.902, mem: 15.574g total. not sure can go faster.

es = elasticsearch.elasticsearch([{'host': 'uxmachine-test', 'port': 9200}]) index_name = 'new_index' type_name = 'new_type' mapping = json.loads(open(config["index_mapping"]).read()) #read mapping json file  es.indices.create(index_name) es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)  open(file_to_index, 'rb') csvfile:     reader = csv.reader(csvfile)        #read documents indexing csv file, more million rows     content = {"_index": index_name, "_type": type_name}     batch_chunks = []     iterator = 0      row in reader:         var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)         id_increment = id_increment + 1         #var = transform_row_for_indexing(row,fields, index_name, type_name)         batch_chunks.append(var)         if iterator % 500 == 0:             helpers.bulk(es,batch_chunks)             del batch_chunks[:]             print "ispucalo batch"         iterator = iterator + 1     # indexing of last batch_chunk     if len(batch_chunks) != 0:         helpers.bulk(es,batch_chunks) 

so streaming bulk returns interator. means nothing happen until start iterating on it. code 'bulk' function looks this:

success, failed = 0, 0  # list of errors collected not stats_only errors = []  ok, item in streaming_bulk(client, actions, **kwargs):     # go through request-reponse pairs , detect failures     if not ok:         if not stats_only:             errors.append(item)         failed += 1     else:         success += 1  return success, failed if stats_only else errors 

so calling streaming_bulk(client, actions, **kwargs) won't anything. it's not until iterate on done in loop indexing starts happen.

so in code. welcome change 'bulk' 'streaming_bulk' need iterate on results of streaming bulk in order have indexed.


Comments

Popular posts from this blog

ruby - Trying to change last to "x"s to 23 -

jquery - Clone last and append item to closest class -

css - Can I use the :after pseudo-element on an input field? -