From 125622eb947ae66d8870abc93a70a7a0bb368970 Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Sun, 25 Jul 2021 18:19:50 -0700 Subject: [PATCH 01/11] added seed to random choice --- benchmark/generate_workload.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/benchmark/generate_workload.py b/benchmark/generate_workload.py index 18d8127..25a5338 100644 --- a/benchmark/generate_workload.py +++ b/benchmark/generate_workload.py @@ -32,9 +32,10 @@ def getUnique(field): df.to_csv(outfile, index=False, mode='a') return df[field] -def generateWorkload(query_name, field="id.orig_h" ,runs=1000): +def generateWorkload(query_name, field="id.orig_h" ,runs=1000, seed=10): workload = [] uniqueVals = getUnique(field) + random.seed(seed) for i in range(runs): uniqueVal = random.choice(uniqueVals) workload.append({'query': query_name + " " + field, 'arguments': [uniqueVal]}) @@ -47,7 +48,7 @@ def generateWorkload(query_name, field="id.orig_h" ,runs=1000): def main(): os.system("rm -fr {}".format(OUTPUT_DIR)) os.system("mkdir {}".format(OUTPUT_DIR)) - generateWorkload("search", "id.orig_h", 1000) + generateWorkload("search", "id.orig_h", 1000, 10) if __name__ == "__main__": main() From 517f8db207233b17d58059ac263a398adee35ece Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Sun, 1 Aug 2021 21:39:30 +0000 Subject: [PATCH 02/11] changed generate workload.py to sample from a list of IPs instead of a set --- benchmark/generate_workload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/generate_workload.py b/benchmark/generate_workload.py index 1c4f91e..3e0c9f2 100644 --- a/benchmark/generate_workload.py +++ b/benchmark/generate_workload.py @@ -25,7 +25,7 @@ def getUnique(field): possibleVals.append(record[field]) except KeyError as e: pass - df[field] = list(set(possibleVals)) + df[field] = possibleVals outfile = '{}/possibleVals.csv'.format(OUTPUT_DIR) with open(outfile, 'w') as outfile: From 856ee9d2d2f9349160960342fbc3a03e3cf643d7 Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Mon, 2 Aug 2021 04:26:44 +0000 Subject: [PATCH 03/11] Removed 'id.orig_h' from being appended to each query name to allow for queries other than simple search --- benchmark/generate_workload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/generate_workload.py b/benchmark/generate_workload.py index 3e0c9f2..52e1730 100644 --- a/benchmark/generate_workload.py +++ b/benchmark/generate_workload.py @@ -38,7 +38,7 @@ def generateWorkload(query_name, field="id.orig_h", runs=1000, seed=42): random.seed(seed) for i in range(runs): uniqueVal = random.choice(uniqueVals) - workload.append({'query': query_name + " " + field, 'arguments': [uniqueVal]}) + workload.append({'query': query_name , 'arguments': [uniqueVal]}) with open('{}/network_log_search_{}.ndjson'.format(OUTPUT_DIR, runs), 'w') as f: writer = ndjson.writer(f, ensure_ascii=False) From 5fa841c9eef2b55bda1fd2c034a8c25b9ecd030d Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Tue, 3 Aug 2021 02:37:19 +0000 Subject: [PATCH 04/11] Added script to issue elastic queries --- benchmark/elastic/elastic_benchmark_config.py | 19 +++ benchmark/elastic/issue_elastic_queries.py | 126 ++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 benchmark/elastic/elastic_benchmark_config.py create mode 100755 benchmark/elastic/issue_elastic_queries.py diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py new file mode 100644 index 0000000..0ec3db5 --- /dev/null +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -0,0 +1,19 @@ +DATA_DIR="workload/trace/test/" + +WORKLOAD_FILE_NAME="network_log_search.ndjson" + +OUTPUT_DIR="workload/trace/test/" + +OUTPUT_FILE_NAME="query_execution_times.csv" + +QUERIES = ["search id.orig_h", + "search id.orig_h + sort ts", + "search id.orig_h + sort ts + slice 5", + "search id.orig_h + count by id.resp_h", + "search id.orig_h + sum orig_bytes", + "search id.orig_h + count by schema"] + + +AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h", + "search id.orig_h + count by schema": "_path" } + diff --git a/benchmark/elastic/issue_elastic_queries.py b/benchmark/elastic/issue_elastic_queries.py new file mode 100755 index 0000000..8edc1a2 --- /dev/null +++ b/benchmark/elastic/issue_elastic_queries.py @@ -0,0 +1,126 @@ +import json +import ndjson +import numpy as np +import os +import pandas as pd + +from elastic_benchmark_config import * + +input_directory = DATA_DIR +workload_file = WORKLOAD_FILE_NAME +output_directory = OUTPUT_DIR +output_file = OUTPUT_FILE_NAME +queries = QUERIES +aggregation_fields = AGGREGATION_FIELDS + +def runQuery(query): + cmd = query + " > " + output_directory + "query-output.json" + os.system(cmd) + with open(output_directory + "query-output.json", 'r') as j: + contents = json.loads(j.read()) + executiontime = contents["took"] + hits = contents["hits"]["total"]["value"] + + os.system("rm " + output_directory + "query-output.json") + return {"executiontime": executiontime * 0.001, "hits": hits} + +def getQuery(queryname, arguments): + # TODO: edit queries to return all results, not just top 10-1000 + if queryname == "search id.orig_h": + query = "curl -X GET 'http://localhost:9200/test/_search?q=id.orig_h:"+arguments[0]+"&format=json&pretty'" + elif queryname == "search id.orig_h + sort ts": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "search id.orig_h + sort ts + slice 5": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 5,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "search id.orig_h + count by id.resp_h": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" + elif queryname == "search id.orig_h + sum orig_bytes": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" + elif queryname == "search id.orig_h + count by schema": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" + else: + query= "" + return query + +def issueQueries(): + real_list = [] + query_name_list = [] + argument_list = [] + validation_list =[] + + reset() + + with open(input_directory + workload_file ) as f: + data = ndjson.load(f) + for d in data: + if (d['query'] in queries): + queryname = d['query'] + query = getQuery(queryname, d['arguments']) + if query: + if queryname in aggregation_fields: + prepForAggregation(aggregation_fields[queryname]) + + queryoutput = runQuery(query) + real_list.append(queryoutput["executiontime"]) + query_name_list.append(d['query']) + argument_list.append(d['arguments']) + validation_list.append(queryoutput["hits"]) + + if queryname in aggregation_fields: + cleanFromAggregation(aggregation_fields[queryname]) + else: + print('Query Not Supported: ' + str(d['query'])) + else: + print('Query Not Supported: ' + str(d['query'])) + + num_queries = len(real_list) + output_df = pd.DataFrame({'index': range(num_queries)}) + output_df = output_df.set_index('index') + + output_df.insert(0, 'system', ['Elastic'] * num_queries) + output_df.insert(1, 'in_format', ['index'] * num_queries) + output_df.insert(2,'out_format', ['elastic'] * num_queries) + output_df.insert(3, 'query', query_name_list) + output_df.insert(4,'start_time', [0] * num_queries) + output_df.insert(5,'real', real_list) + output_df.insert(6, 'user', [np.nan] * num_queries) + output_df.insert(7, 'sys',[np.nan] * num_queries) + output_df.insert(8, 'argument_0', argument_list) + output_df.insert(9, 'validation', validation_list) + + output_df.to_csv(output_directory + output_file, na_rep='NaN') + return output_df + +def clearcache(): + clearcache = "curl -X POST \"localhost:9200/test/_cache/clear?pretty\"" + os.system(clearcache) + +def restartElastic(): + os.system("sudo systemctl stop elasticsearch.service") + os.system("sudo systemctl start elasticsearch.service") + os.system("sleep 30") + +def reset(): + clearcache() + restartElastic() + +# Aggregation prep/clean due to: +# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. +# Please use a keyword field instead. +# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. +# Note that this can use significant memory." + +def prepForAggregation(fieldname): + prep_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": true}}}'" + os.system(prep_query) + +def cleanFromAggregation(fieldname): + clean_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": false}}}'" + os.system(clean_query) + +def main(): + df = issueQueries() + print df + +if __name__ == "__main__": + main() From 751d331fb3f4f165f2065da667b3597e63ad0b61 Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Tue, 3 Aug 2021 02:45:52 +0000 Subject: [PATCH 05/11] config edits to make script cleaner to run straight from repository --- benchmark/elastic/elastic_benchmark_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py index 0ec3db5..ec63ec5 100644 --- a/benchmark/elastic/elastic_benchmark_config.py +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -1,8 +1,8 @@ DATA_DIR="workload/trace/test/" -WORKLOAD_FILE_NAME="network_log_search.ndjson" +WORKLOAD_FILE_NAME="network_log_search_30.ndjson" -OUTPUT_DIR="workload/trace/test/" +OUTPUT_DIR="elastic-output/" OUTPUT_FILE_NAME="query_execution_times.csv" From d13214f3a834a57ba8a52e52d92bd5d1722c751c Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Thu, 5 Aug 2021 00:06:13 +0000 Subject: [PATCH 06/11] Changes based on PR comments 1 --- benchmark/elastic/elastic_benchmark_config.py | 2 +- benchmark/elastic/issue_elastic_queries.py | 8 +------- benchmark/generate_workload.py | 2 +- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py index ec63ec5..eaa41eb 100644 --- a/benchmark/elastic/elastic_benchmark_config.py +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -1,4 +1,4 @@ -DATA_DIR="workload/trace/test/" +DATA_DIR="../workload/trace/" WORKLOAD_FILE_NAME="network_log_search_30.ndjson" diff --git a/benchmark/elastic/issue_elastic_queries.py b/benchmark/elastic/issue_elastic_queries.py index 8edc1a2..7ac4c99 100755 --- a/benchmark/elastic/issue_elastic_queries.py +++ b/benchmark/elastic/issue_elastic_queries.py @@ -48,7 +48,7 @@ def issueQueries(): argument_list = [] validation_list =[] - reset() + restartElastic() with open(input_directory + workload_file ) as f: data = ndjson.load(f) @@ -91,18 +91,12 @@ def issueQueries(): output_df.to_csv(output_directory + output_file, na_rep='NaN') return output_df -def clearcache(): - clearcache = "curl -X POST \"localhost:9200/test/_cache/clear?pretty\"" - os.system(clearcache) def restartElastic(): os.system("sudo systemctl stop elasticsearch.service") os.system("sudo systemctl start elasticsearch.service") os.system("sleep 30") -def reset(): - clearcache() - restartElastic() # Aggregation prep/clean due to: # "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. diff --git a/benchmark/generate_workload.py b/benchmark/generate_workload.py index 52e1730..2bc6831 100644 --- a/benchmark/generate_workload.py +++ b/benchmark/generate_workload.py @@ -46,7 +46,7 @@ def generateWorkload(query_name, field="id.orig_h", runs=1000, seed=42): writer.writerow(query) def main(): - generateWorkload("search", "id.orig_h", 30) + generateWorkload("search id.orig_h", "id.orig_h", 30) if __name__ == "__main__": main() From 20bb8b49d70a84277cfef22a028fc2d060124921 Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Thu, 12 Aug 2021 15:19:40 -0700 Subject: [PATCH 07/11] added flag for whether to generate list of unique IPs each time --- benchmark/generate_workload.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/benchmark/generate_workload.py b/benchmark/generate_workload.py index 793d590..e620832 100644 --- a/benchmark/generate_workload.py +++ b/benchmark/generate_workload.py @@ -40,9 +40,8 @@ def getUnique(field): df.to_csv(outfile, index=False, mode='a') return df[field] -def generateSearchWorkload(query_name, field="id.orig_h", runs=1000, seed=42): +def generateSearchWorkload(query_name, uniqueVals, field="id.orig_h", runs=1000, seed=42): workload = [] - uniqueVals = getUnique(field) random.seed(seed) for i in range(runs): uniqueVal = random.choice(uniqueVals) @@ -80,8 +79,13 @@ def generateAnalyticsWorkload(query_name, window_size_s=5, runs=1000, seed=42): writer.writerow({'query': query_name, 'arguments': [t.strftime("%Y-%m-%dT%H:%M:%S.%fZ") for t in args]}) -def main(): - generateSearchWorkload("search id.orig_h", "id.orig_h", 30) +def main(newUniqueRun=False): + if newUniqueRun: + uniqueVals = getUnique("id.orig_h") + else: + uniqueVals = pd.read_csv('{}/{}'.format(OUTPUT_DIR, UNIQUE_VALS_FILE), delimiter='\n')["id.orig_h"].to_list() + + generateSearchWorkload("search id.orig_h", uniqueVals, "id.orig_h", 30) generateAnalyticsWorkload("analytics sum orig_bytes", 5, 30) if __name__ == "__main__": From 947492e8af0952da3f33cc818dee05afc657104e Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Fri, 13 Aug 2021 17:38:12 +0000 Subject: [PATCH 08/11] fixed config file to run in repo --- benchmark/elastic/elastic_benchmark_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py index eaa41eb..2de8c0c 100644 --- a/benchmark/elastic/elastic_benchmark_config.py +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -4,7 +4,7 @@ OUTPUT_DIR="elastic-output/" -OUTPUT_FILE_NAME="query_execution_times.csv" +OUTPUT_FILE_NAME="elastic_network_log_search_30_execution_times.csv" QUERIES = ["search id.orig_h", "search id.orig_h + sort ts", From 84c6628d4c816fbe44a538579658f38bb7a2614b Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Thu, 9 Sep 2021 21:11:16 +0000 Subject: [PATCH 09/11] updated script to use search after --- benchmark/elastic/elastic_benchmark_config.py | 11 +-- benchmark/elastic/issue_elastic_queries.py | 78 +++++++++++++------ 2 files changed, 59 insertions(+), 30 deletions(-) mode change 100644 => 100755 benchmark/elastic/elastic_benchmark_config.py diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py old mode 100644 new mode 100755 index 2de8c0c..4885ad3 --- a/benchmark/elastic/elastic_benchmark_config.py +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -6,14 +6,9 @@ OUTPUT_FILE_NAME="elastic_network_log_search_30_execution_times.csv" -QUERIES = ["search id.orig_h", - "search id.orig_h + sort ts", - "search id.orig_h + sort ts + slice 5", - "search id.orig_h + count by id.resp_h", - "search id.orig_h + sum orig_bytes", - "search id.orig_h + count by schema"] +QUERIES = ["search id.orig_h","search id.orig_h + sort ts + slice 1000", "analytics avg orig_bytes"] -AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h", - "search id.orig_h + count by schema": "_path" } +AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h", + "search id.orig_h + count by schema": "_path" } diff --git a/benchmark/elastic/issue_elastic_queries.py b/benchmark/elastic/issue_elastic_queries.py index 7ac4c99..522588f 100755 --- a/benchmark/elastic/issue_elastic_queries.py +++ b/benchmark/elastic/issue_elastic_queries.py @@ -16,28 +16,44 @@ def runQuery(query): cmd = query + " > " + output_directory + "query-output.json" os.system(cmd) + + sortvals = [] with open(output_directory + "query-output.json", 'r') as j: contents = json.loads(j.read()) executiontime = contents["took"] hits = contents["hits"]["total"]["value"] - + ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0} + for content in contents["hits"]["hits"]: + sortvals.append(content["sort"]) + if sortvals: + sortval = sortvals.pop()[0] + if sortval: + ret["sortval"] = sortval os.system("rm " + output_directory + "query-output.json") - return {"executiontime": executiontime * 0.001, "hits": hits} + return ret -def getQuery(queryname, arguments): +def getQuery(queryname, arguments, sortval): # TODO: edit queries to return all results, not just top 10-1000 if queryname == "search id.orig_h": - query = "curl -X GET 'http://localhost:9200/test/_search?q=id.orig_h:"+arguments[0]+"&format=json&pretty'" - elif queryname == "search id.orig_h + sort ts": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "search id.orig_h + sort ts + slice 5": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 5,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "search id.orig_h + count by id.resp_h": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" - elif queryname == "search id.orig_h + sum orig_bytes": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" - elif queryname == "search id.orig_h + count by schema": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" + if sortval: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" + else: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" + #elif queryname == "search id.orig_h + sort ts": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "search id.orig_h + sort ts + slice 1000": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + # elif queryname == "search id.orig_h + count by id.resp_h": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" + # elif queryname == "search id.orig_h + sum orig_bytes": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" + # elif queryname == "search id.orig_h + count by schema": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" + elif queryname == "analytics avg orig_bytes": + if sortval: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"search_after\": ["+str(sortval)+"], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" + else: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" else: query= "" return query @@ -47,24 +63,40 @@ def issueQueries(): query_name_list = [] argument_list = [] validation_list =[] - - restartElastic() + sortval_list=[] + + #restartElastic() with open(input_directory + workload_file ) as f: data = ndjson.load(f) for d in data: if (d['query'] in queries): queryname = d['query'] - query = getQuery(queryname, d['arguments']) + query = getQuery(queryname, d['arguments'], 0) if query: if queryname in aggregation_fields: prepForAggregation(aggregation_fields[queryname]) queryoutput = runQuery(query) + sortval = queryoutput['sortval'] + indiv_hits = queryoutput['hits'] + + while indiv_hits >= 10000 and sortval: + sa_query = getQuery(queryname, d['arguments'], sortval) + curr_queryoutput = runQuery(sa_query) + + queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] + queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] + queryoutput["sortval"] = curr_queryoutput["sortval"] + + sortval = curr_queryoutput["sortval"] + indiv_hits = curr_queryoutput["hits"] + real_list.append(queryoutput["executiontime"]) query_name_list.append(d['query']) argument_list.append(d['arguments']) validation_list.append(queryoutput["hits"]) + sortval_list.append(queryoutput["sortval"]) if queryname in aggregation_fields: cleanFromAggregation(aggregation_fields[queryname]) @@ -72,7 +104,7 @@ def issueQueries(): print('Query Not Supported: ' + str(d['query'])) else: print('Query Not Supported: ' + str(d['query'])) - + num_queries = len(real_list) output_df = pd.DataFrame({'index': range(num_queries)}) output_df = output_df.set_index('index') @@ -87,6 +119,8 @@ def issueQueries(): output_df.insert(7, 'sys',[np.nan] * num_queries) output_df.insert(8, 'argument_0', argument_list) output_df.insert(9, 'validation', validation_list) + output_df.insert(10, 'sortval', sortval_list) + output_df.to_csv(output_directory + output_file, na_rep='NaN') return output_df @@ -98,10 +132,10 @@ def restartElastic(): os.system("sleep 30") -# Aggregation prep/clean due to: -# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. -# Please use a keyword field instead. -# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. +# Aggregation prep/clean due to: +# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. +# Please use a keyword field instead. +# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. # Note that this can use significant memory." def prepForAggregation(fieldname): From 2338a8610d7a20fee8869290b1633a0cf4f13e19 Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Tue, 14 Sep 2021 21:34:30 +0000 Subject: [PATCH 10/11] included script with analytics and search queries handled, as well as individual scripts that handle either just simple search or just analytics and search + head --- benchmark/elastic/issue_elastic_queries.py | 100 ++++++----- ...ic_queries_analytics_searchandhead_only.py | 165 +++++++++++++++++ ...ssue_elastic_queries_simple_search_only.py | 167 ++++++++++++++++++ 3 files changed, 391 insertions(+), 41 deletions(-) mode change 100755 => 100644 benchmark/elastic/issue_elastic_queries.py create mode 100644 benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py create mode 100755 benchmark/elastic/issue_elastic_queries_simple_search_only.py diff --git a/benchmark/elastic/issue_elastic_queries.py b/benchmark/elastic/issue_elastic_queries.py old mode 100755 new mode 100644 index 522588f..ced7c63 --- a/benchmark/elastic/issue_elastic_queries.py +++ b/benchmark/elastic/issue_elastic_queries.py @@ -3,6 +3,7 @@ import numpy as np import os import pandas as pd +import time from elastic_benchmark_config import * @@ -13,63 +14,67 @@ queries = QUERIES aggregation_fields = AGGREGATION_FIELDS -def runQuery(query): +def runQuery(query, queryname): cmd = query + " > " + output_directory + "query-output.json" os.system(cmd) - - sortvals = [] + + if queryname == "search id.orig_h": + sortvals = [] + with open(output_directory + "query-output.json", 'r') as j: contents = json.loads(j.read()) executiontime = contents["took"] hits = contents["hits"]["total"]["value"] - ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0} - for content in contents["hits"]["hits"]: - sortvals.append(content["sort"]) - if sortvals: - sortval = sortvals.pop()[0] - if sortval: - ret["sortval"] = sortval + avg = contents["aggregations"]["avg_field"]["value"] + ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0, "avg": avg} + if queryname == "search id.orig_h": + for content in contents["hits"]["hits"]: + sortvals.append(content["sort"]) + if sortvals: + sortval = sortvals.pop()[0] + if sortval: + ret["sortval"] = sortval os.system("rm " + output_directory + "query-output.json") return ret def getQuery(queryname, arguments, sortval): - # TODO: edit queries to return all results, not just top 10-1000 if queryname == "search id.orig_h": if sortval: query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" else: query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" - #elif queryname == "search id.orig_h + sort ts": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" elif queryname == "search id.orig_h + sort ts + slice 1000": query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "analytics avg field": + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'" # elif queryname == "search id.orig_h + count by id.resp_h": # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" # elif queryname == "search id.orig_h + sum orig_bytes": # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" # elif queryname == "search id.orig_h + count by schema": # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" - elif queryname == "analytics avg orig_bytes": - if sortval: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"search_after\": ["+str(sortval)+"], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" - else: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" + # elif queryname == "search id.orig_h + sort ts": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" else: query= "" return query def issueQueries(): - real_list = [] + took_list = [] query_name_list = [] argument_list = [] validation_list =[] - sortval_list=[] + real_list=[] + if queryname == "search id.orig_h": + sortval_list=[] + + restartElastic() - #restartElastic() with open(input_directory + workload_file ) as f: data = ndjson.load(f) for d in data: + start_time = time.time() if (d['query'] in queries): queryname = d['query'] query = getQuery(queryname, d['arguments'], 0) @@ -77,26 +82,30 @@ def issueQueries(): if queryname in aggregation_fields: prepForAggregation(aggregation_fields[queryname]) - queryoutput = runQuery(query) - sortval = queryoutput['sortval'] - indiv_hits = queryoutput['hits'] - - while indiv_hits >= 10000 and sortval: - sa_query = getQuery(queryname, d['arguments'], sortval) - curr_queryoutput = runQuery(sa_query) - - queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] - queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] - queryoutput["sortval"] = curr_queryoutput["sortval"] - - sortval = curr_queryoutput["sortval"] - indiv_hits = curr_queryoutput["hits"] - - real_list.append(queryoutput["executiontime"]) + queryoutput = runQuery(query, queryname) + + if queryname == "search id.orig_h": + sortval = queryoutput['sortval'] + indiv_hits = queryoutput['hits'] + + while indiv_hits >= 10000 and sortval: + sa_query = getQuery(queryname, d['arguments'], sortval) + curr_queryoutput = runQuery(sa_query, queryname) + + queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] + queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] + queryoutput["sortval"] = curr_queryoutput["sortval"] + + sortval = curr_queryoutput["sortval"] + indiv_hits = curr_queryoutput["hits"] + + took_list.append(queryoutput['executiontime']) query_name_list.append(d['query']) argument_list.append(d['arguments']) - validation_list.append(queryoutput["hits"]) - sortval_list.append(queryoutput["sortval"]) + validation_list.append(queryoutput["avg"]) + + if queryname == "search id.orig_h": + sortval_list.append(queryoutput["sortval"]) if queryname in aggregation_fields: cleanFromAggregation(aggregation_fields[queryname]) @@ -105,7 +114,10 @@ def issueQueries(): else: print('Query Not Supported: ' + str(d['query'])) - num_queries = len(real_list) + end_time = time.time() + real_list.append(end_time-start_time) + + num_queries = len(took_list) output_df = pd.DataFrame({'index': range(num_queries)}) output_df = output_df.set_index('index') @@ -119,7 +131,10 @@ def issueQueries(): output_df.insert(7, 'sys',[np.nan] * num_queries) output_df.insert(8, 'argument_0', argument_list) output_df.insert(9, 'validation', validation_list) - output_df.insert(10, 'sortval', sortval_list) + output_df.insert(10, 'instance', ["m5.large"] * num_queries) + output_df.insert(11,'took', took_list) + #output_df.insert(11, 'sortval', sortval_list) + output_df.to_csv(output_directory + output_file, na_rep='NaN') @@ -128,8 +143,11 @@ def issueQueries(): def restartElastic(): os.system("sudo systemctl stop elasticsearch.service") + print("Elastic Stopped") os.system("sudo systemctl start elasticsearch.service") + print("Elastic Started") os.system("sleep 30") + print("Sleep Complete") # Aggregation prep/clean due to: diff --git a/benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py b/benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py new file mode 100644 index 0000000..148752f --- /dev/null +++ b/benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py @@ -0,0 +1,165 @@ +import json +import ndjson +import numpy as np +import os +import pandas as pd +import time + +from elastic_benchmark_config import * + +input_directory = DATA_DIR +workload_file = WORKLOAD_FILE_NAME +output_directory = OUTPUT_DIR +output_file = OUTPUT_FILE_NAME +queries = QUERIES +aggregation_fields = AGGREGATION_FIELDS + +def runQuery(query): + cmd = query + " > " + output_directory + "query-output.json" + os.system(cmd) + + #sortvals = [] + with open(output_directory + "query-output.json", 'r') as j: + contents = json.loads(j.read()) + executiontime = contents["took"] + hits = contents["hits"]["total"]["value"] + avg = contents["aggregations"]["avg_field"]["value"] + ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0, "avg": avg} + # for content in contents["hits"]["hits"]: + # sortvals.append(content["sort"]) + # if sortvals: + # sortval = sortvals.pop()[0] + # if sortval: + # ret["sortval"] = sortval + os.system("rm " + output_directory + "query-output.json") + return ret + +def getQuery(queryname, arguments, sortval): + # TODO: edit queries to return all results, not just top 10-1000 + if queryname == "search id.orig_h": + if sortval: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" + else: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" + #elif queryname == "search id.orig_h + sort ts": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "search id.orig_h + sort ts + slice 1000": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + # elif queryname == "search id.orig_h + count by id.resp_h": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" + # elif queryname == "search id.orig_h + sum orig_bytes": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" + # elif queryname == "search id.orig_h + count by schema": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" + elif queryname == "analytics avg field": + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'" + else: + query= "" + return query + +def issueQueries(): + took_list = [] + query_name_list = [] + argument_list = [] + validation_list =[] + #sortval_list=[] + real_list=[] + + restartElastic() + + + with open(input_directory + workload_file ) as f: + data = ndjson.load(f) + for d in data: + start_time = time.time() + if (d['query'] in queries): + queryname = d['query'] + query = getQuery(queryname, d['arguments'], 0) + if query: + if queryname in aggregation_fields: + prepForAggregation(aggregation_fields[queryname]) + + queryoutput = runQuery(query) + # sortval = queryoutput['sortval'] + # indiv_hits = queryoutput['hits'] + + # while indiv_hits >= 10000 and sortval: + # sa_query = getQuery(queryname, d['arguments'], sortval) + # curr_queryoutput = runQuery(sa_query) + # + # queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] + # queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] + # queryoutput["sortval"] = curr_queryoutput["sortval"] + # + # sortval = curr_queryoutput["sortval"] + # indiv_hits = curr_queryoutput["hits"] + + took_list.append(queryoutput['executiontime']) + query_name_list.append(d['query']) + argument_list.append(d['arguments']) + validation_list.append(queryoutput["avg"]) + # sortval_list.append(queryoutput["sortval"]) + + if queryname in aggregation_fields: + cleanFromAggregation(aggregation_fields[queryname]) + else: + print('Query Not Supported: ' + str(d['query'])) + else: + print('Query Not Supported: ' + str(d['query'])) + + end_time = time.time() + real_list.append(end_time-start_time) + + num_queries = len(took_list) + output_df = pd.DataFrame({'index': range(num_queries)}) + output_df = output_df.set_index('index') + + output_df.insert(0, 'system', ['Elastic'] * num_queries) + output_df.insert(1, 'in_format', ['index'] * num_queries) + output_df.insert(2,'out_format', ['elastic'] * num_queries) + output_df.insert(3, 'query', query_name_list) + output_df.insert(4,'start_time', [0] * num_queries) + output_df.insert(5,'real', real_list) + output_df.insert(6, 'user', [np.nan] * num_queries) + output_df.insert(7, 'sys',[np.nan] * num_queries) + output_df.insert(8, 'argument_0', argument_list) + output_df.insert(9, 'validation', validation_list) + output_df.insert(10, 'instance', ["m5.large"] * num_queries) + output_df.insert(11,'took', took_list) + #output_df.insert(11, 'sortval', sortval_list) + + + + output_df.to_csv(output_directory + output_file, na_rep='NaN') + return output_df + + +def restartElastic(): + os.system("sudo systemctl stop elasticsearch.service") + print("Elastic Stopped") + os.system("sudo systemctl start elasticsearch.service") + print("Elastic Started") + os.system("sleep 30") + print("Sleep Complete") + + +# Aggregation prep/clean due to: +# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. +# Please use a keyword field instead. +# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. +# Note that this can use significant memory." + +def prepForAggregation(fieldname): + prep_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": true}}}'" + os.system(prep_query) + +def cleanFromAggregation(fieldname): + clean_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": false}}}'" + os.system(clean_query) + +def main(): + df = issueQueries() + print df + +if __name__ == "__main__": + main() diff --git a/benchmark/elastic/issue_elastic_queries_simple_search_only.py b/benchmark/elastic/issue_elastic_queries_simple_search_only.py new file mode 100755 index 0000000..b09ad99 --- /dev/null +++ b/benchmark/elastic/issue_elastic_queries_simple_search_only.py @@ -0,0 +1,167 @@ +import json +import ndjson +import numpy as np +import os +import pandas as pd +import time + +from elastic_benchmark_config import * + +input_directory = DATA_DIR +workload_file = WORKLOAD_FILE_NAME +output_directory = OUTPUT_DIR +output_file = OUTPUT_FILE_NAME +queries = QUERIES +aggregation_fields = AGGREGATION_FIELDS + +def runQuery(query): + cmd = query + " > " + output_directory + "query-output.json" + os.system(cmd) + + sortvals = [] + with open(output_directory + "query-output.json", 'r') as j: + contents = json.loads(j.read()) + executiontime = contents["took"] + hits = contents["hits"]["total"]["value"] + ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0} + for content in contents["hits"]["hits"]: + sortvals.append(content["sort"]) + if sortvals: + sortval = sortvals.pop()[0] + if sortval: + ret["sortval"] = sortval + os.system("rm " + output_directory + "query-output.json") + return ret + +def getQuery(queryname, arguments, sortval): + # TODO: edit queries to return all results, not just top 10-1000 + if queryname == "search id.orig_h": + if sortval: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" + else: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" + #elif queryname == "search id.orig_h + sort ts": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "search id.orig_h + sort ts + slice 1000": + query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + # elif queryname == "search id.orig_h + count by id.resp_h": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" + # elif queryname == "search id.orig_h + sum orig_bytes": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" + # elif queryname == "search id.orig_h + count by schema": + # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" + elif queryname == "analytics avg field": + if sortval: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"search_after\": ["+str(sortval)+"], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" + else: + query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" + else: + query= "" + return query + +def issueQueries(): + took_list = [] + query_name_list = [] + argument_list = [] + validation_list =[] + sortval_list=[] + real_list=[] + + restartElastic() + + + with open(input_directory + workload_file ) as f: + data = ndjson.load(f) + for d in data: + start_time = time.time() + if (d['query'] in queries): + queryname = d['query'] + query = getQuery(queryname, d['arguments'], 0) + if query: + if queryname in aggregation_fields: + prepForAggregation(aggregation_fields[queryname]) + + queryoutput = runQuery(query) + sortval = queryoutput['sortval'] + indiv_hits = queryoutput['hits'] + + while indiv_hits >= 10000 and sortval: + sa_query = getQuery(queryname, d['arguments'], sortval) + curr_queryoutput = runQuery(sa_query) + + queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] + queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] + queryoutput["sortval"] = curr_queryoutput["sortval"] + + sortval = curr_queryoutput["sortval"] + indiv_hits = curr_queryoutput["hits"] + + took_list.append(queryoutput["executiontime"]) + query_name_list.append(d['query']) + argument_list.append(d['arguments']) + validation_list.append(queryoutput["hits"]) + sortval_list.append(queryoutput["sortval"]) + + if queryname in aggregation_fields: + cleanFromAggregation(aggregation_fields[queryname]) + else: + print('Query Not Supported: ' + str(d['query'])) + else: + print('Query Not Supported: ' + str(d['query'])) + + end_time = time.time() + real_list.append(end_time-start_time) + + num_queries = len(took_list) + output_df = pd.DataFrame({'index': range(num_queries)}) + output_df = output_df.set_index('index') + + output_df.insert(0, 'system', ['Elastic'] * num_queries) + output_df.insert(1, 'in_format', ['index'] * num_queries) + output_df.insert(2,'out_format', ['elastic'] * num_queries) + output_df.insert(3, 'query', query_name_list) + output_df.insert(4,'start_time', [0] * num_queries) + output_df.insert(5,'real', real_list) + output_df.insert(6, 'user', [np.nan] * num_queries) + output_df.insert(7, 'sys',[np.nan] * num_queries) + output_df.insert(8, 'argument_0', argument_list) + output_df.insert(9, 'validation', validation_list) + output_df.insert(10, 'instance', ["m5.large"] * num_queries) + output_df.insert(11,'took', took_list) + #output_df.insert(11, 'sortval', sortval_list) + + + + output_df.to_csv(output_directory + output_file, na_rep='NaN') + return output_df + + +def restartElastic(): + os.system("sudo systemctl stop elasticsearch.service") + print("Elastic Stopped") + os.system("sudo systemctl start elasticsearch.service") + print("Elastic Started") + os.system("sleep 30") + print("Sleep Complete") + + +# Aggregation prep/clean due to: +# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. +# Please use a keyword field instead. +# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. +# Note that this can use significant memory." + +def prepForAggregation(fieldname): + prep_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": true}}}'" + os.system(prep_query) + +def cleanFromAggregation(fieldname): + clean_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": false}}}'" + os.system(clean_query) + +def main(): + df = issueQueries() + print df + +if __name__ == "__main__": + main() From ab3a827864e36e105bbb3fb50744e0f7ce046148 Mon Sep 17 00:00:00 2001 From: Ananya Krishnaswamy Date: Wed, 15 Sep 2021 08:03:01 +0000 Subject: [PATCH 11/11] updated elastic script to use scroll and fixed validation bug --- benchmark/elastic/elastic_benchmark_config.py | 13 +- benchmark/elastic/issue_elastic_queries.py | 92 ++++------ ...ic_queries_analytics_searchandhead_only.py | 165 ----------------- ...ssue_elastic_queries_simple_search_only.py | 167 ------------------ 4 files changed, 41 insertions(+), 396 deletions(-) mode change 100644 => 100755 benchmark/elastic/issue_elastic_queries.py delete mode 100644 benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py delete mode 100755 benchmark/elastic/issue_elastic_queries_simple_search_only.py diff --git a/benchmark/elastic/elastic_benchmark_config.py b/benchmark/elastic/elastic_benchmark_config.py index 4885ad3..d24a65b 100755 --- a/benchmark/elastic/elastic_benchmark_config.py +++ b/benchmark/elastic/elastic_benchmark_config.py @@ -1,14 +1,19 @@ DATA_DIR="../workload/trace/" -WORKLOAD_FILE_NAME="network_log_search_30.ndjson" +#WORKLOAD_FILE_NAME="network_log_search_needles_30.ndjson" +#WORKLOAD_FILE_NAME="network_log_search_needles_30_2.ndjson" +WORKLOAD_FILE_NAME="network_log_analytics_avg_30.ndjson" +#WORKLOAD_FILE_NAME="debug.ndjson" OUTPUT_DIR="elastic-output/" -OUTPUT_FILE_NAME="elastic_network_log_search_30_execution_times.csv" +#OUTPUT_FILE_NAME="elastic_network_log_search_needles_30_execution_times.csv" +#OUTPUT_FILE_NAME="elastic_network_log_search_needles_30_2_execution_times.csv" +OUTPUT_FILE_NAME="elastic_network_log_analytics_avg_30_execution_times.csv" +#OUTPUT_FILE_NAME="debug.csv" -QUERIES = ["search id.orig_h","search id.orig_h + sort ts + slice 1000", "analytics avg orig_bytes"] +QUERIES = ["search id.orig_h","search id.orig_h + sort ts + slice 1000", "analytics avg field"] AGGREGATION_FIELDS= {"search id.orig_h + count by id.resp_h": "id.resp_h", "search id.orig_h + count by schema": "_path" } - diff --git a/benchmark/elastic/issue_elastic_queries.py b/benchmark/elastic/issue_elastic_queries.py old mode 100644 new mode 100755 index ced7c63..e9bec99 --- a/benchmark/elastic/issue_elastic_queries.py +++ b/benchmark/elastic/issue_elastic_queries.py @@ -17,44 +17,35 @@ def runQuery(query, queryname): cmd = query + " > " + output_directory + "query-output.json" os.system(cmd) - - if queryname == "search id.orig_h": - sortvals = [] - + + last_ports = [] + with open(output_directory + "query-output.json", 'r') as j: contents = json.loads(j.read()) executiontime = contents["took"] hits = contents["hits"]["total"]["value"] - avg = contents["aggregations"]["avg_field"]["value"] - ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0, "avg": avg} - if queryname == "search id.orig_h": - for content in contents["hits"]["hits"]: - sortvals.append(content["sort"]) - if sortvals: - sortval = sortvals.pop()[0] - if sortval: - ret["sortval"] = sortval + if queryname == "analytics avg field": + avg = contents["aggregations"]["avg_field"]["value"] + else: + avg = 0 + if queryname == "search id.orig_h + sort ts + slice 1000": + for content in contents["hits"]["hits"]: + last_ports.append(content["_source"]["id.orig_p"]) + last_port = last_ports.pop() + else: + last_port = 0 + ret = {"executiontime": executiontime * 0.001, "hits": hits, "avg": avg, "last_port": last_port } + os.system("rm " + output_directory + "query-output.json") return ret def getQuery(queryname, arguments, sortval): if queryname == "search id.orig_h": - if sortval: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" - else: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" + query = "curl -X POST \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"query\": {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\"}}}'" elif queryname == "search id.orig_h + sort ts + slice 1000": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "analytics avg field": - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'" - # elif queryname == "search id.orig_h + count by id.resp_h": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" - # elif queryname == "search id.orig_h + sum orig_bytes": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" - # elif queryname == "search id.orig_h + count by schema": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" - # elif queryname == "search id.orig_h + sort ts": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + query = "curl -X GET \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" + elif queryname == "analytics avg field": + query = "curl -X GET \"localhost:9200/test/_search?scroll=1m&pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'" else: query= "" return query @@ -63,14 +54,13 @@ def issueQueries(): took_list = [] query_name_list = [] argument_list = [] - validation_list =[] + avg_list =[] + hits_list =[] real_list=[] - if queryname == "search id.orig_h": - sortval_list=[] + port_list =[] restartElastic() - with open(input_directory + workload_file ) as f: data = ndjson.load(f) for d in data: @@ -79,36 +69,16 @@ def issueQueries(): queryname = d['query'] query = getQuery(queryname, d['arguments'], 0) if query: - if queryname in aggregation_fields: - prepForAggregation(aggregation_fields[queryname]) queryoutput = runQuery(query, queryname) - - if queryname == "search id.orig_h": - sortval = queryoutput['sortval'] - indiv_hits = queryoutput['hits'] - - while indiv_hits >= 10000 and sortval: - sa_query = getQuery(queryname, d['arguments'], sortval) - curr_queryoutput = runQuery(sa_query, queryname) - - queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] - queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] - queryoutput["sortval"] = curr_queryoutput["sortval"] - - sortval = curr_queryoutput["sortval"] - indiv_hits = curr_queryoutput["hits"] took_list.append(queryoutput['executiontime']) query_name_list.append(d['query']) argument_list.append(d['arguments']) - validation_list.append(queryoutput["avg"]) - - if queryname == "search id.orig_h": - sortval_list.append(queryoutput["sortval"]) + avg_list.append(queryoutput["avg"]) + hits_list.append(queryoutput["hits"]) + port_list.append(queryoutput["last_port"]) - if queryname in aggregation_fields: - cleanFromAggregation(aggregation_fields[queryname]) else: print('Query Not Supported: ' + str(d['query'])) else: @@ -130,12 +100,14 @@ def issueQueries(): output_df.insert(6, 'user', [np.nan] * num_queries) output_df.insert(7, 'sys',[np.nan] * num_queries) output_df.insert(8, 'argument_0', argument_list) - output_df.insert(9, 'validation', validation_list) - output_df.insert(10, 'instance', ["m5.large"] * num_queries) + if any(avg_list): + output_df.insert(9, 'validation', avg_list) + elif any(port_list): + output_df.insert(9, 'validation', port_list) + else: + output_df.insert(9, 'validation', hits_list) + output_df.insert(10, 'instance', ["m5d.2xlarge"] * num_queries) output_df.insert(11,'took', took_list) - #output_df.insert(11, 'sortval', sortval_list) - - output_df.to_csv(output_directory + output_file, na_rep='NaN') return output_df diff --git a/benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py b/benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py deleted file mode 100644 index 148752f..0000000 --- a/benchmark/elastic/issue_elastic_queries_analytics_searchandhead_only.py +++ /dev/null @@ -1,165 +0,0 @@ -import json -import ndjson -import numpy as np -import os -import pandas as pd -import time - -from elastic_benchmark_config import * - -input_directory = DATA_DIR -workload_file = WORKLOAD_FILE_NAME -output_directory = OUTPUT_DIR -output_file = OUTPUT_FILE_NAME -queries = QUERIES -aggregation_fields = AGGREGATION_FIELDS - -def runQuery(query): - cmd = query + " > " + output_directory + "query-output.json" - os.system(cmd) - - #sortvals = [] - with open(output_directory + "query-output.json", 'r') as j: - contents = json.loads(j.read()) - executiontime = contents["took"] - hits = contents["hits"]["total"]["value"] - avg = contents["aggregations"]["avg_field"]["value"] - ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0, "avg": avg} - # for content in contents["hits"]["hits"]: - # sortvals.append(content["sort"]) - # if sortvals: - # sortval = sortvals.pop()[0] - # if sortval: - # ret["sortval"] = sortval - os.system("rm " + output_directory + "query-output.json") - return ret - -def getQuery(queryname, arguments, sortval): - # TODO: edit queries to return all results, not just top 10-1000 - if queryname == "search id.orig_h": - if sortval: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" - else: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" - #elif queryname == "search id.orig_h + sort ts": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "search id.orig_h + sort ts + slice 1000": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - # elif queryname == "search id.orig_h + count by id.resp_h": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" - # elif queryname == "search id.orig_h + sum orig_bytes": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" - # elif queryname == "search id.orig_h + count by schema": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" - elif queryname == "analytics avg field": - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{\"aggs\": {\"avg_field\": {\"avg\": { \"field\": \""+arguments[0]+"\"}}}}'" - else: - query= "" - return query - -def issueQueries(): - took_list = [] - query_name_list = [] - argument_list = [] - validation_list =[] - #sortval_list=[] - real_list=[] - - restartElastic() - - - with open(input_directory + workload_file ) as f: - data = ndjson.load(f) - for d in data: - start_time = time.time() - if (d['query'] in queries): - queryname = d['query'] - query = getQuery(queryname, d['arguments'], 0) - if query: - if queryname in aggregation_fields: - prepForAggregation(aggregation_fields[queryname]) - - queryoutput = runQuery(query) - # sortval = queryoutput['sortval'] - # indiv_hits = queryoutput['hits'] - - # while indiv_hits >= 10000 and sortval: - # sa_query = getQuery(queryname, d['arguments'], sortval) - # curr_queryoutput = runQuery(sa_query) - # - # queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] - # queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] - # queryoutput["sortval"] = curr_queryoutput["sortval"] - # - # sortval = curr_queryoutput["sortval"] - # indiv_hits = curr_queryoutput["hits"] - - took_list.append(queryoutput['executiontime']) - query_name_list.append(d['query']) - argument_list.append(d['arguments']) - validation_list.append(queryoutput["avg"]) - # sortval_list.append(queryoutput["sortval"]) - - if queryname in aggregation_fields: - cleanFromAggregation(aggregation_fields[queryname]) - else: - print('Query Not Supported: ' + str(d['query'])) - else: - print('Query Not Supported: ' + str(d['query'])) - - end_time = time.time() - real_list.append(end_time-start_time) - - num_queries = len(took_list) - output_df = pd.DataFrame({'index': range(num_queries)}) - output_df = output_df.set_index('index') - - output_df.insert(0, 'system', ['Elastic'] * num_queries) - output_df.insert(1, 'in_format', ['index'] * num_queries) - output_df.insert(2,'out_format', ['elastic'] * num_queries) - output_df.insert(3, 'query', query_name_list) - output_df.insert(4,'start_time', [0] * num_queries) - output_df.insert(5,'real', real_list) - output_df.insert(6, 'user', [np.nan] * num_queries) - output_df.insert(7, 'sys',[np.nan] * num_queries) - output_df.insert(8, 'argument_0', argument_list) - output_df.insert(9, 'validation', validation_list) - output_df.insert(10, 'instance', ["m5.large"] * num_queries) - output_df.insert(11,'took', took_list) - #output_df.insert(11, 'sortval', sortval_list) - - - - output_df.to_csv(output_directory + output_file, na_rep='NaN') - return output_df - - -def restartElastic(): - os.system("sudo systemctl stop elasticsearch.service") - print("Elastic Stopped") - os.system("sudo systemctl start elasticsearch.service") - print("Elastic Started") - os.system("sleep 30") - print("Sleep Complete") - - -# Aggregation prep/clean due to: -# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. -# Please use a keyword field instead. -# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. -# Note that this can use significant memory." - -def prepForAggregation(fieldname): - prep_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": true}}}'" - os.system(prep_query) - -def cleanFromAggregation(fieldname): - clean_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": false}}}'" - os.system(clean_query) - -def main(): - df = issueQueries() - print df - -if __name__ == "__main__": - main() diff --git a/benchmark/elastic/issue_elastic_queries_simple_search_only.py b/benchmark/elastic/issue_elastic_queries_simple_search_only.py deleted file mode 100755 index b09ad99..0000000 --- a/benchmark/elastic/issue_elastic_queries_simple_search_only.py +++ /dev/null @@ -1,167 +0,0 @@ -import json -import ndjson -import numpy as np -import os -import pandas as pd -import time - -from elastic_benchmark_config import * - -input_directory = DATA_DIR -workload_file = WORKLOAD_FILE_NAME -output_directory = OUTPUT_DIR -output_file = OUTPUT_FILE_NAME -queries = QUERIES -aggregation_fields = AGGREGATION_FIELDS - -def runQuery(query): - cmd = query + " > " + output_directory + "query-output.json" - os.system(cmd) - - sortvals = [] - with open(output_directory + "query-output.json", 'r') as j: - contents = json.loads(j.read()) - executiontime = contents["took"] - hits = contents["hits"]["total"]["value"] - ret = {"executiontime": executiontime * 0.001, "hits": hits, "sortval": 0} - for content in contents["hits"]["hits"]: - sortvals.append(content["sort"]) - if sortvals: - sortval = sortvals.pop()[0] - if sortval: - ret["sortval"] = sortval - os.system("rm " + output_directory + "query-output.json") - return ret - -def getQuery(queryname, arguments, sortval): - # TODO: edit queries to return all results, not just top 10-1000 - if queryname == "search id.orig_h": - if sortval: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"search_after\": ["+str(sortval)+"] , \"sort\": [{\"ts\": \"asc\"}],\"query\" : {\"term\" : {\"id.orig_h\" : \""+arguments[0]+"\" } } } '" - else: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d' { \"sort\": [{\"ts\": \"asc\"}], \"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" } } } '" - #elif queryname == "search id.orig_h + sort ts": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"sort\" : [\"ts\" ],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - elif queryname == "search id.orig_h + sort ts + slice 1000": - query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"size\": 1000,\"sort\" : [{\"ts\" : {\"order\": \"asc\" }}],\"query\" : {\"term\" : { \"id.orig_h\" : \""+arguments[0]+"\" }}}'" - # elif queryname == "search id.orig_h + count by id.resp_h": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"id.resp_h\": {\"terms\": {\"field\": \"id.resp_h\"}}}}'" - # elif queryname == "search id.orig_h + sum orig_bytes": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"orig_bytes\": {\"sum\": {\"field\": \"orig_bytes\"}}}}'" - # elif queryname == "search id.orig_h + count by schema": - # query = "curl -X GET \"localhost:9200/test/_search?format=json&pretty\" -H 'Content-Type: application/json' -d'{\"query\" : {\"term\" : { \"id.orig_h\" : \""+ arguments[0] +"\" }}, \"aggs\": {\"schema\": {\"terms\": {\"field\": \"_path\"}}}}'" - elif queryname == "analytics avg field": - if sortval: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"search_after\": ["+str(sortval)+"], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" - else: - query = "curl -X GET \"localhost:9200/test/_search?pretty\" -H 'Content-Type: application/json' -d'{ \"sort\": [{\"ts\": \"asc\"}], \"aggs\": {\"time\": {\"date_range\": {\"field\": \"ts\",\"ranges\": [{\"from\": \""+arguments[0]+"\", \"to\": \""+arguments[1]+"\"}]},\"aggs\": {\"avg_orig_bytes\": {\"avg\": { \"field\": \"orig_bytes\"}}}}}}'" - else: - query= "" - return query - -def issueQueries(): - took_list = [] - query_name_list = [] - argument_list = [] - validation_list =[] - sortval_list=[] - real_list=[] - - restartElastic() - - - with open(input_directory + workload_file ) as f: - data = ndjson.load(f) - for d in data: - start_time = time.time() - if (d['query'] in queries): - queryname = d['query'] - query = getQuery(queryname, d['arguments'], 0) - if query: - if queryname in aggregation_fields: - prepForAggregation(aggregation_fields[queryname]) - - queryoutput = runQuery(query) - sortval = queryoutput['sortval'] - indiv_hits = queryoutput['hits'] - - while indiv_hits >= 10000 and sortval: - sa_query = getQuery(queryname, d['arguments'], sortval) - curr_queryoutput = runQuery(sa_query) - - queryoutput["executiontime"] = queryoutput["executiontime"] + curr_queryoutput["executiontime"] - queryoutput["hits"] = queryoutput["hits"] + curr_queryoutput["hits"] - queryoutput["sortval"] = curr_queryoutput["sortval"] - - sortval = curr_queryoutput["sortval"] - indiv_hits = curr_queryoutput["hits"] - - took_list.append(queryoutput["executiontime"]) - query_name_list.append(d['query']) - argument_list.append(d['arguments']) - validation_list.append(queryoutput["hits"]) - sortval_list.append(queryoutput["sortval"]) - - if queryname in aggregation_fields: - cleanFromAggregation(aggregation_fields[queryname]) - else: - print('Query Not Supported: ' + str(d['query'])) - else: - print('Query Not Supported: ' + str(d['query'])) - - end_time = time.time() - real_list.append(end_time-start_time) - - num_queries = len(took_list) - output_df = pd.DataFrame({'index': range(num_queries)}) - output_df = output_df.set_index('index') - - output_df.insert(0, 'system', ['Elastic'] * num_queries) - output_df.insert(1, 'in_format', ['index'] * num_queries) - output_df.insert(2,'out_format', ['elastic'] * num_queries) - output_df.insert(3, 'query', query_name_list) - output_df.insert(4,'start_time', [0] * num_queries) - output_df.insert(5,'real', real_list) - output_df.insert(6, 'user', [np.nan] * num_queries) - output_df.insert(7, 'sys',[np.nan] * num_queries) - output_df.insert(8, 'argument_0', argument_list) - output_df.insert(9, 'validation', validation_list) - output_df.insert(10, 'instance', ["m5.large"] * num_queries) - output_df.insert(11,'took', took_list) - #output_df.insert(11, 'sortval', sortval_list) - - - - output_df.to_csv(output_directory + output_file, na_rep='NaN') - return output_df - - -def restartElastic(): - os.system("sudo systemctl stop elasticsearch.service") - print("Elastic Stopped") - os.system("sudo systemctl start elasticsearch.service") - print("Elastic Started") - os.system("sleep 30") - print("Sleep Complete") - - -# Aggregation prep/clean due to: -# "Text fields are not optimised for operations that require per-document field data like aggregations and sorting, so these operations are disabled by default. -# Please use a keyword field instead. -# Alternatively, set fielddata=true on [id.resp_h] in order to load field data by uninverting the inverted index. -# Note that this can use significant memory." - -def prepForAggregation(fieldname): - prep_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": true}}}'" - os.system(prep_query) - -def cleanFromAggregation(fieldname): - clean_query = "curl -X PUT \"localhost:9200/test/_mapping/_doc?include_type_name=true&pretty\" -H 'Content-Type: application/json' -d'{\"properties\": { \""+ fieldname +"\" : { \"type\": \"text\", \"fielddata\": false}}}'" - os.system(clean_query) - -def main(): - df = issueQueries() - print df - -if __name__ == "__main__": - main()