Package IDAscope :: Package idascope :: Package core :: Module SemanticIdentifier
[hide private]
[frames] | no frames]

Source Code for Module IDAscope.idascope.core.SemanticIdentifier

  1  #!/usr/bin/python 
  2  ######################################################################## 
  3  # Copyright (c) 2012 
  4  # Daniel Plohmann <daniel.plohmann<at>gmail<dot>com> 
  5  # Alexander Hanel <alexander.hanel<at>gmail<dot>com> 
  6  # All rights reserved. 
  7  ######################################################################## 
  8  # 
  9  #  This file is part of IDAscope 
 10  # 
 11  #  IDAscope is free software: you can redistribute it and/or modify it 
 12  #  under the terms of the GNU General Public License as published by 
 13  #  the Free Software Foundation, either version 3 of the License, or 
 14  #  (at your option) any later version. 
 15  # 
 16  #  This program is distributed in the hope that it will be useful, but 
 17  #  WITHOUT ANY WARRANTY; without even the implied warranty of 
 18  #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 19  #  General Public License for more details. 
 20  # 
 21  #  You should have received a copy of the GNU General Public License 
 22  #  along with this program.  If not, see 
 23  #  <http://www.gnu.org/licenses/>. 
 24  # 
 25  ######################################################################## 
 26  # Credits: 
 27  # - Thanks to Branko Spasojevic for contributing a function for 
 28  #   finding and renaming potential wrapper functions. 
 29  ######################################################################## 
 30   
 31  import json 
 32  import re 
 33   
 34  import JsonHelper 
 35   
 36  from IdaProxy import IdaProxy 
 37  from idascope.core.structures.FunctionContext import FunctionContext 
 38  from idascope.core.structures.CallContext import CallContext 
 39  from idascope.core.structures.ParameterContext import ParameterContext 
 40   
 41   
42 -class SemanticIdentifier():
43 """ 44 A module to analyze and explore an IDB for semantics. For a set of API names, references to these 45 are identified and used for creating context and allowing tagging of them. 46 """ 47
48 - def __init__(self, config_filename):
49 print ("loading SemanticIdentifier") 50 self.re = re 51 self.ida_proxy = IdaProxy() 52 self.FunctionContext = FunctionContext 53 self.CallContext = CallContext 54 self.ParameterContext = ParameterContext 55 self.renaming_seperator = "_" 56 self.semantic_definitions = [] 57 self.last_result = {} 58 self.load_config(config_filename) 59 return
60
61 - def load_config(self, config_filename):
62 """ 63 Loads a semantic configuration file and collects all definitions from it. 64 @param config_filename: filename of a semantic configuration file 65 @type config_filename: str 66 """ 67 config_file = open(config_filename, "r") 68 config = config_file.read() 69 parsed_config = json.loads(config, object_hook=JsonHelper.decode_dict) 70 self.renaming_seperator = parsed_config["renaming_seperator"] 71 self.semantic_definitions = parsed_config["semantic_definitions"] 72 return
73
75 """ 76 Calculates the number of basic blocks for a given function by walking its FlowChart. 77 @param function_address: function address to calculate the block count for 78 @type function_address: int 79 """ 80 number_of_blocks = 0 81 try: 82 func_chart = self.ida_proxy.FlowChart(self.ida_proxy.get_func(function_address)) 83 for block in func_chart: 84 number_of_blocks += 1 85 except: 86 pass 87 return number_of_blocks
88
90 """ 91 returns the number of basic blocks for the function containing the queried address, 92 based on the value stored in the last scan result. 93 94 If the number of basic blocks for this function has never been calculated, zero is returned. 95 @param function_address: function address to get the block count for 96 @type function_address: int 97 @return: (int) The number of blocks in th e function 98 """ 99 number_of_blocks = 0 100 function_address = self.get_function_address_for_address(address) 101 if function_address in self.last_result.keys(): 102 number_of_blocks = self.last_result[function_address].number_of_basic_blocks 103 return number_of_blocks
104
105 - def scan(self):
106 """ 107 Scan the whole IDB with all available techniques. 108 """ 109 self.scan_by_references() 110 self.scan_all_code()
111
112 - def scan_by_references(self):
113 """ 114 Scan by references to API names, based on the definitions loaded from the config file. 115 This is highly efficient because we only touch places in the IDB that actually have references 116 to our API names of interest. 117 """ 118 scan_result = {} 119 for semantic_group in self.semantic_definitions: 120 semantic_group_tag = semantic_group["tag"] 121 for api_name in semantic_group["api_names"]: 122 api_address = self.ida_proxy.LocByName(api_name) 123 code_ref_addrs = [ref for ref in self.ida_proxy.CodeRefsTo(api_address, 0)] 124 data_ref_addrs = [ref for ref in self.ida_proxy.DataRefsTo(api_address)] 125 ref_addrs = iter(set(code_ref_addrs).union(set(data_ref_addrs))) 126 for ref in ref_addrs: 127 function_ctx = self.FunctionContext() 128 function_ctx.function_address = self.ida_proxy.LocByName(self.ida_proxy.GetFunctionName(ref)) 129 function_ctx.function_name = self.ida_proxy.GetFunctionName(ref) 130 function_ctx.has_dummy_name = (self.ida_proxy.GetFlags(function_ctx.function_address) & \ 131 self.ida_proxy.FF_LABL) > 0 132 if function_ctx.function_address not in scan_result.keys(): 133 scan_result[function_ctx.function_address] = function_ctx 134 else: 135 function_ctx = scan_result[function_ctx.function_address] 136 call_ctx = self.CallContext() 137 call_ctx.called_function_name = api_name 138 call_ctx.address_of_call = ref 139 call_ctx.called_address = api_address 140 call_ctx.tag = semantic_group_tag 141 call_ctx.parameter_contexts = self._resolve_api_call(call_ctx) 142 function_ctx.call_contexts.append(call_ctx) 143 self.last_result = scan_result
144
145 - def scan_all_code(self):
146 """ 147 Not implemented yet. In the long run, this function shall perform a full enumeration of all instructions, 148 gathering information like number of instructions, number of basic blocks, 149 references to and from functions etc. 150 """ 151 # for all functions, accumulate data for the following fields: 152 # number_of_basic_blocks = 0 153 # number_of_instructions = 0 154 # number_of_xrefs_from = 0 155 # number_of_xrefs_to = 0 156 pass
157
158 - def get_function_address_for_address(self, address):
159 """ 160 Get a function address containing the queried address. 161 @param address: address to check the function address for 162 @type address: int 163 @return: (int) The start address of the function containing this address 164 """ 165 return self.ida_proxy.LocByName(self.ida_proxy.GetFunctionName(address))
166
168 """ 169 Calculate the number of functions in all segments. 170 @return: (int) the number of functions found. 171 """ 172 number_of_functions = 0 173 for seg_ea in self.ida_proxy.Segments(): 174 for function_ea in self.ida_proxy.Functions(self.ida_proxy.SegStart(seg_ea), self.ida_proxy.SegEnd(seg_ea)): 175 number_of_functions += 1 176 return number_of_functions
177
179 """ 180 Get all function address that have been covered by the last scanning. 181 @return: (list of int) The addresses of covered functions. 182 """ 183 return self.last_result.keys()
184
186 """ 187 Get all function address with a dummy name that have been covered by the last scanning. 188 @return: (list of int) The addresses of covered functions. 189 """ 190 return [addr for addr in self.last_result.keys() if self.last_result[addr].has_dummy_name]
191
192 - def get_tags(self):
193 """ 194 Get all the tags that have been covered by the last scanning. 195 @return (list of str) The tags found. 196 """ 197 tags = [] 198 for function_address in self.last_result.keys(): 199 for call_ctx in self.last_result[function_address].call_contexts: 200 if call_ctx.tag not in tags: 201 tags.append(call_ctx.tag) 202 return tags
203
204 - def get_tags_for_function_address(self, address):
205 """ 206 Get all tags found for the function containing the queried address. 207 @param address: address in the target function 208 @type address: int 209 @return: (list of str) The tags for the function containing the queried address 210 """ 211 tags = [] 212 function_address = self.get_function_address_for_address(address) 213 if function_address in self.last_result.keys(): 214 for call_ctx in self.last_result[function_address].call_contexts: 215 if call_ctx.tag not in tags: 216 tags.append(call_ctx.tag) 217 return tags
218
219 - def get_tag_count_for_function_address(self, tag, address):
220 """ 221 Get the number of occurrences for a certain tag for the function containing the queried address. 222 @param tag: a tag as included in semantic definitions 223 @type tag: str 224 @param address: address in the target function 225 @type address: int 226 @return: (int) The number of occurrences for this tag in the function 227 """ 228 function_address = self.get_function_address_for_address(address) 229 tag_count = 0 230 if tag in self.get_tags_for_function_address(function_address): 231 for call_ctx in self.last_result[function_address].call_contexts: 232 if call_ctx.tag == tag: 233 tag_count += 1 234 return tag_count
235
236 - def get_tagged_apis_for_function_address(self, address):
237 """ 238 Get all call contexts for the function containing the queried address. 239 @param address: address in the target function 240 @type address: int 241 @return: (list of CallContext data objects) The call contexts identified by the scanning of this function 242 """ 243 function_address = self.get_function_address_for_address(address) 244 if function_address in self.last_result.keys(): 245 all_call_ctx = self.last_result[function_address].call_contexts 246 return [call_ctx for call_ctx in all_call_ctx if call_ctx.tag != ""]
247
249 """ 250 Get all call contexts for all functions 251 @return: a dictionary with key/value entries of the following form: (function_address, 252 dict((call_address, tag))) 253 """ 254 functions_and_tags = {} 255 for function in self.get_identified_function_addresses(): 256 call_contexts = self.get_tagged_apis_for_function_address(function) 257 if function not in functions_and_tags.keys(): 258 functions_and_tags[function] = {} 259 for call_ctx in call_contexts: 260 functions_and_tags[function][call_ctx.address_of_call] = call_ctx.tag 261 return functions_and_tags
262
263 - def get_functions_to_rename(self):
264 """ 265 Get all functions that can be renamed according to the last scan result. Only functions with the standard 266 IDA name I{sub_[0-9A-F]+} will be considered for renaming. 267 @return: a list of dictionaries, each consisting of three tuples: ("old_function_name", str), \ 268 ("new_function_name", str), ("function_address", int) 269 """ 270 functions_to_rename = [] 271 for function_address_to_tag in self.last_result.keys(): 272 new_function_name = self.last_result[function_address_to_tag].function_name 273 # has the function still a dummy name? 274 if self.ida_proxy.GetFlags(function_address_to_tag) & self.ida_proxy.FF_LABL > 0: 275 tags_for_function = self.get_tags_for_function_address(function_address_to_tag) 276 for tag in sorted(tags_for_function, reverse=True): 277 if tag not in new_function_name: 278 new_function_name = tag + self.renaming_seperator + new_function_name 279 functions_to_rename.append({"old_function_name": \ 280 self.last_result[function_address_to_tag].function_name, "new_function_name": \ 281 new_function_name, "function_address": function_address_to_tag}) 282 return functions_to_rename
283
284 - def rename_functions(self):
285 """ 286 Perform the renaming of functions according to the last scan result. 287 """ 288 for function in self.get_functions_to_rename(): 289 if function["old_function_name"] == self.ida_proxy.GetFunctionName(function["function_address"]): 290 self.ida_proxy.MakeNameEx(function["function_address"], function["new_function_name"], \ 291 self.ida_proxy.SN_NOWARN)
292
294 for seg_ea in self.ida_proxy.Segments(): 295 for func_ea in self.ida_proxy.Functions(self.ida_proxy.SegStart(seg_ea), self.ida_proxy.SegEnd(seg_ea)): 296 if (self.ida_proxy.GetFlags(func_ea) & 0x8000) != 0: 297 # dummy function check if wrapper material 298 func_end = self.ida_proxy.GetFunctionAttr(func_ea, self.ida_proxy.FUNCATTR_END) 299 # wrappers are likely short 300 if (func_end - func_ea) > 0 and (func_end - func_ea) < 0x100: 301 nr_calls = 0 302 for i_ea in self.ida_proxy.FuncItems(func_ea): 303 if self.ida_proxy.GetMnem(i_ea) == 'call': 304 nr_calls += 1 305 if nr_calls > 1: 306 break 307 call_dst = list(self.ida_proxy.CodeRefsFrom(i_ea, 0)) 308 if len(call_dst) == 0: 309 continue 310 311 call_dst = call_dst[0] 312 w_name = '' 313 if (self.ida_proxy.GetFunctionFlags(call_dst) & self.ida_proxy.FUNC_LIB) != 0 or \ 314 (self.ida_proxy.GetFlags(func_ea) & self.ida_proxy.FF_LABL) == 0: 315 w_name = self.ida_proxy.Name(call_dst) 316 if nr_calls == 1 and len(w_name) > 0: 317 rval = False 318 name_suffix = 0 319 while rval == False: 320 if name_suffix > 40: 321 print("Potentially more than 50 wrappers for function %s, " \ 322 "please report IDB" % w_name) 323 break 324 if self.ida_proxy.Demangle(w_name, \ 325 self.ida_proxy.GetLongPrm(self.ida_proxy.INF_SHORT_DN)) != w_name: 326 f_name = w_name + '_' + str(name_suffix) 327 elif name_suffix > 0: 328 f_name = w_name + '__w' + str(name_suffix) 329 else: 330 f_name = w_name + '__w' 331 name_suffix += 1 332 rval = self.ida_proxy.MakeNameEx(func_ea, f_name, \ 333 self.ida_proxy.SN_NOCHECK | self.ida_proxy.SN_NOWARN) 334 if rval == True: 335 print("Identified and renamed potential wrapper @ [%08x] to [%s]" % (func_ea, f_name))
336
337 - def get_parameters_for_call_address(self, call_address):
338 """ 339 Get the parameters for the given address of a function call. 340 @param call_address: address of the target call to inspect 341 @type call_address: int 342 @return: a list of ParameterContext data objects. 343 """ 344 target_function_address = self.ida_proxy.LocByName(self.ida_proxy.GetFunctionName(call_address)) 345 all_tagged_apis_in_function = self.get_tagged_apis_for_function_address(target_function_address) 346 for api in all_tagged_apis_in_function: 347 if api.address_of_call == call_address: 348 return self._resolve_api_call(api) 349 return []
350
351 - def _resolve_api_call(self, call_context):
352 """ 353 Resolve the parameters for an API calls based on a call context for this API call. 354 @param call_context: the call context to get the parameter information for 355 @type call_context: a CallContext data object 356 @return: a list of ParameterContext data objects. 357 """ 358 resolved_api_parameters = [] 359 api_signature = self._get_api_signature(call_context.called_function_name) 360 push_addresses = self._get_push_addresses_before_target_address(call_context.address_of_call) 361 resolved_api_parameters = self._match_push_addresses_to_signature(push_addresses, api_signature) 362 return resolved_api_parameters
363
364 - def _match_push_addresses_to_signature(self, push_addresses, api_signature):
365 """ 366 Combine the results of I{_get_push_addresses_before_target_address} and I{_get_api_signature} in order to 367 produce a list of ParameterContext data objects. 368 @param push_addresses: the identified push addresses before a function call that shall be matched to a function 369 signature 370 @type push_addresses: a list of int 371 @param api_signature: information about a function definition with 372 parameter names, types, and so on. 373 @type api_signature: a dictionary with the layout as returned by I{_get_api_signature} 374 @return: a list of ParameterContext data objects. 375 """ 376 matched_parameters = [] 377 # TODO: 378 # upgrade this feature with data flow analysis to resolve parameters with higher precision 379 api_num_params = len(api_signature["parameters"]) 380 push_addresses = push_addresses[-api_num_params:] 381 # TODO: 382 # There might be the case where we identify less pushed parameters than required by the function 383 # signature. Thus we calculate a "parameter discrepancy" that we use to adjust our enumeration index 384 # so that the last n parameters get matched correctly. This is a temporary fix and might be solved later on. 385 parameter_discrepancy = len(push_addresses) - api_num_params 386 for index, param in enumerate(api_signature["parameters"], start=parameter_discrepancy): 387 param_ctx = self.ParameterContext() 388 param_ctx.parameter_type = param["type"] 389 param_ctx.parameter_name = param["name"] 390 if (parameter_discrepancy != 0) and (index < 0): 391 param_ctx.valid = False 392 else: 393 param_ctx.push_address = push_addresses[index] 394 param_ctx.ida_operand_type = self.ida_proxy.GetOpType(push_addresses[index], 0) 395 param_ctx.ida_operand_value = self.ida_proxy.GetOperandValue(push_addresses[index], 0) 396 param_ctx.value = param_ctx.ida_operand_value 397 matched_parameters.append(param_ctx) 398 return matched_parameters
399
400 - def _get_api_signature(self, api_name):
401 """ 402 Get the signature for a function by using IDA's I{GetType()}. The string is then parsed with a Regex and 403 returned as a dictionary. 404 @param api_name: name of the API / function to get type information for 405 @type api_name: str 406 @return: a dictionary with key/value entries of the following form: ("return_type", str), 407 ("parameters", [dict(("type", str), ("name", str))]) 408 """ 409 api_signature = {"api_name": api_name, "parameters": []} 410 api_location = self.ida_proxy.LocByName(api_name) 411 type_def = self.ida_proxy.GetType(api_location) 412 function_signature_regex = r"(?P<return_type>[\w\s\*]+)\((?P<parameters>[,\.\*\w\s]*)\)" 413 result = self.re.match(function_signature_regex, type_def) 414 if result is not None: 415 api_signature["return_type"] = result.group("return_type") 416 if len(result.group("parameters")) > 0: 417 for parameter in result.group("parameters").split(","): 418 type_and_name = {} 419 type_and_name["type"] = parameter[:parameter.rfind(" ")].strip() 420 type_and_name["name"] = parameter[parameter.rfind(" "):].strip() 421 api_signature["parameters"].append(type_and_name) 422 else: 423 print ("SemanticIdentifier._get_api_signature: No API/function signature for \"%s\" @ 0x%x available.") \ 424 % (api_name, api_location) 425 # TODO: 426 # here should be a check for the calling convention 427 # currently, list list is simply reversed to match the order parameters are pushed to the stack 428 api_signature["parameters"].reverse() 429 return api_signature
430
432 """ 433 Get the addresses of all push instructions in the basic block preceding the given address. 434 @param address: address to get the push addresses for. 435 @type address: int 436 @return: a list of int 437 """ 438 push_addresses = [] 439 function_chart = self.ida_proxy.FlowChart(self.ida_proxy.get_func(address)) 440 for block in function_chart: 441 if block.startEA <= address < block.endEA: 442 for instruction_addr in self.ida_proxy.Heads(block.startEA, block.endEA): 443 if self.ida_proxy.GetMnem(instruction_addr) == "push": 444 push_addresses.append(instruction_addr) 445 if instruction_addr >= address: 446 break 447 return push_addresses
448
449 - def get_last_result(self):
450 """ 451 Get the last scan result as retrieved by I{scan_by_references}. 452 @return: a dictionary with key/value entries of the following form: (function_address, FunctionContext) 453 """ 454 return self.last_result
455
456 - def print_last_result(self):
457 """ 458 nicely print the last scan result (mostly used for debugging) 459 """ 460 for function_address in self.last_result.keys(): 461 print ("0x%x - %s -> ") % (function_address, self.ida_proxy.GetFunctionName(function_address)) \ 462 + ", ".join(self.get_tags_for_function_address(function_address)) 463 for call_ctx in self.last_result[function_address].call_contexts: 464 print (" 0x%x - %s (%s)") % (call_ctx.address_of_call, call_ctx.called_function_name, call_ctx.tag)
465