Merge pull request #50 from EFForg/imsi-analyzer

IMSI provided + null cipher analyzer
This commit is contained in:
Cooper Quintin
2024-07-19 16:05:44 -07:00
committed by GitHub
7 changed files with 244 additions and 1 deletions
+3 -1
View File
@@ -4,7 +4,7 @@ use serde::Serialize;
use crate::{diag::MessagesContainer, gsmtap_parser};
use super::{information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer};
use super::{imsi_provided::ImsiProvidedAnalyzer, information_element::InformationElement, lte_downgrade::LteSib6And7DowngradeAnalyzer, null_cipher::NullCipherAnalyzer};
/// Qualitative measure of how severe a Warning event type is.
/// The levels should break down like this:
@@ -100,6 +100,8 @@ impl Harness {
pub fn new_with_all_analyzers() -> Self {
let mut harness = Harness::new();
harness.add_analyzer(Box::new(LteSib6And7DowngradeAnalyzer{}));
harness.add_analyzer(Box::new(ImsiProvidedAnalyzer{}));
harness.add_analyzer(Box::new(NullCipherAnalyzer{}));
harness
}
+37
View File
@@ -0,0 +1,37 @@
use std::borrow::Cow;
use telcom_parser::lte_rrc::{PCCH_MessageType, PCCH_MessageType_c1, PagingUE_Identity};
use super::analyzer::{Analyzer, Event, EventType, Severity};
use super::information_element::{InformationElement, LteInformationElement};
pub struct ImsiProvidedAnalyzer {
}
impl Analyzer for ImsiProvidedAnalyzer {
fn get_name(&self) -> Cow<str> {
Cow::from("IMSI Provided")
}
fn get_description(&self) -> Cow<str> {
Cow::from("Tests whether the UE's IMSI was ever provided to the cell")
}
fn analyze_information_element(&mut self, ie: &InformationElement) -> Option<Event> {
let InformationElement::LTE(LteInformationElement::PCCH(pcch_msg)) = ie else {
return None;
};
let PCCH_MessageType::C1(PCCH_MessageType_c1::Paging(paging)) = &pcch_msg.message else {
return None;
};
for record in &paging.paging_record_list.as_ref()?.0 {
if let PagingUE_Identity::Imsi(_) = record.ue_identity {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::High },
message: "IMSI was provided to cell".to_string(),
})
}
}
None
}
}
+2
View File
@@ -1,3 +1,5 @@
pub mod analyzer;
pub mod information_element;
pub mod lte_downgrade;
pub mod imsi_provided;
pub mod null_cipher;
+115
View File
@@ -0,0 +1,115 @@
use std::borrow::Cow;
use telcom_parser::lte_rrc::{CipheringAlgorithm_r12, DL_CCCH_MessageType, DL_CCCH_MessageType_c1, DL_DCCH_MessageType, DL_DCCH_MessageType_c1, PCCH_MessageType, PCCH_MessageType_c1, PagingUE_Identity, RRCConnectionReconfiguration, RRCConnectionReconfigurationCriticalExtensions, RRCConnectionReconfigurationCriticalExtensions_c1, RRCConnectionReconfiguration_r8_IEs, RRCConnectionRelease_v890_IEs, SCG_Configuration_r12, SecurityConfigHO_v1530HandoverType_v1530, SecurityModeCommand, SecurityModeCommandCriticalExtensions, SecurityModeCommandCriticalExtensions_c1};
use super::analyzer::{Analyzer, Event, EventType, Severity};
use super::information_element::{InformationElement, LteInformationElement};
pub struct NullCipherAnalyzer {
}
impl NullCipherAnalyzer {
fn check_rrc_connection_reconfiguration_cipher(&self, reconfiguration: &RRCConnectionReconfiguration) -> bool {
let RRCConnectionReconfigurationCriticalExtensions::C1(c1) = &reconfiguration.critical_extensions else {
return false;
};
let RRCConnectionReconfigurationCriticalExtensions_c1::RrcConnectionReconfiguration_r8(c1) = c1 else {
return false;
};
if let Some(handover) = &c1.security_config_ho {
let maybe_security_config = match &handover.handover_type {
telcom_parser::lte_rrc::SecurityConfigHOHandoverType::IntraLTE(lte) => lte.security_algorithm_config.as_ref(),
telcom_parser::lte_rrc::SecurityConfigHOHandoverType::InterRAT(rat) => Some(&rat.security_algorithm_config),
};
if let Some(security_config) = maybe_security_config {
if security_config.ciphering_algorithm.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
}
}
// Use map/flatten to dig into a long chain of nested Option types
let maybe_v1250 = c1.non_critical_extension.as_ref()
.map(|v890| v890.non_critical_extension.as_ref()).flatten()
.map(|v920| v920.non_critical_extension.as_ref()).flatten()
.map(|v1020| v1020.non_critical_extension.as_ref()).flatten()
.map(|v1130| v1130.non_critical_extension.as_ref()).flatten();
let Some(v1250) = maybe_v1250 else {
return false;
};
if let Some(SCG_Configuration_r12::Setup(scg_setup)) = v1250.scg_configuration_r12.as_ref() {
let maybe_cipher = scg_setup.scg_config_part_scg_r12.as_ref()
.map(|scg| scg.mobility_control_info_scg_r12.as_ref()).flatten()
.map(|mci| mci.ciphering_algorithm_scg_r12.as_ref()).flatten();
if let Some(cipher) = maybe_cipher {
if cipher.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
}
}
let maybe_v1530_security_config = v1250.non_critical_extension.as_ref()
.map(|v1310| v1310.non_critical_extension.as_ref()).flatten()
.map(|v1430| v1430.non_critical_extension.as_ref()).flatten()
.map(|v1510| v1510.non_critical_extension.as_ref()).flatten()
.map(|v1530| v1530.security_config_ho_v1530.as_ref()).flatten();
let Some(v1530_security_config) = maybe_v1530_security_config else {
return false;
};
let maybe_security_algorithm = match &v1530_security_config.handover_type_v1530 {
SecurityConfigHO_v1530HandoverType_v1530::Intra5GC(intra_5gc) => intra_5gc.security_algorithm_config_r15.as_ref(),
SecurityConfigHO_v1530HandoverType_v1530::Fivegc_ToEPC(to_epc) => Some(&to_epc.security_algorithm_config_r15),
SecurityConfigHO_v1530HandoverType_v1530::Epc_To5GC(to_5gc) => Some(&to_5gc.security_algorithm_config_r15),
};
if let Some(security_algorithm) = maybe_security_algorithm {
if security_algorithm.ciphering_algorithm.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
}
false
}
fn check_security_mode_command_cipher(&self, command: &SecurityModeCommand) -> bool {
let SecurityModeCommandCriticalExtensions::C1(c1) = &command.critical_extensions else {
return false;
};
let SecurityModeCommandCriticalExtensions_c1::SecurityModeCommand_r8(r8) = &c1 else {
return false;
};
if r8.security_config_smc.security_algorithm_config.ciphering_algorithm.0 == CipheringAlgorithm_r12::EEA0 {
return true;
}
false
}
}
impl Analyzer for NullCipherAnalyzer {
fn get_name(&self) -> Cow<str> {
Cow::from("Null Cipher")
}
fn get_description(&self) -> Cow<str> {
Cow::from("Tests whether the cell suggests using a null cipher (EEA0)")
}
fn analyze_information_element(&mut self, ie: &InformationElement) -> Option<Event> {
let InformationElement::LTE(LteInformationElement::DlDcch(dcch_msg)) = ie else {
return None;
};
let DL_DCCH_MessageType::C1(c1) = &dcch_msg.message else {
return None;
};
let null_cipher_detected = match c1 {
DL_DCCH_MessageType_c1::RrcConnectionReconfiguration(reconfiguration) => self.check_rrc_connection_reconfiguration_cipher(reconfiguration),
DL_DCCH_MessageType_c1::SecurityModeCommand(command) => self.check_security_mode_command_cipher(command),
_ => return None,
};
if null_cipher_detected {
return Some(Event {
event_type: EventType::QualitativeWarning { severity: Severity::High },
message: "Cell suggested use of null cipher".to_string(),
});
}
None
}
}
+16
View File
@@ -0,0 +1,16 @@
## Rayhunter tools
### `asn1grep.py`: a script for finding a datatype in ASN.1 files
`asn1grep` parses our ASN.1 spec files, then searches for a given datatype by recursively descending through the LTE-RRC types we care about. it then prints out each result as a "path" through the highly nested datatypes.
Setup:
1. `python -m venv .venv && . .venv/bin/activate`
2. `pip install -r requirements.txt`
Usage:
```
» python asn1grep.py IMSI
searching for IMSI
PCCH-Message [message [message.c1 [c1 [c1.paging [paging [pagingRecordList[0] [ [ue-Identity [ue-Identity.imsi [IMSI]]]]]]]]]]
```
+67
View File
@@ -0,0 +1,67 @@
import asn1tools
import sys
ASN_FILES = [
'../telcom-parser/specs/PC5-RRC-Definitions.asn',
'../telcom-parser/specs/EUTRA-RRC-Definitions.asn',
]
TERMINATING_TYPE_NAMES = [
'DL-CCCH-Message',
'DL-DCCH-Message',
'UL-CCCH-Message',
'UL-DCCH-Message',
'BCCH-BCH-Message',
'BCCH-DL-SCH-Message',
'PCCH-Message',
'MCCH-Message',
'SC-MCCH-Message-r13',
'BCCH-BCH-Message-MBMS',
'BCCH-DL-SCH-Message-BR',
'BCCH-DL-SCH-Message-MBMS',
'SBCCH-SL-BCH-Message',
'SBCCH-SL-BCH-Message-V2X-r14',
]
def load_asn():
return asn1tools.compile_files(ASN_FILES, cache_dir=".cache")
def get_terminating_types(rrc_asn):
return [rrc_asn.types[name] for name in TERMINATING_TYPE_NAMES]
def search_type(haystack, needle):
if haystack.type_name == needle or haystack.name == needle:
return [needle]
result = []
if 'members' in haystack.__dict__:
for name, member in haystack.name_to_member.items():
for member_result in search_type(member, needle):
result.append(f"{haystack.name} ({haystack.type_name}).{name}\n {member_result}")
elif 'root_members' in haystack.__dict__:
for member in haystack.root_members:
for member_result in search_type(member, needle):
result.append(f"{haystack.name} ({haystack.type_name})\n {member_result}")
elif 'element_type' in haystack.__dict__:
for element_result in search_type(haystack.element_type, needle):
result.append(f"{haystack.name}[0] ({haystack.type_name})\n {element_result}")
elif 'inner' in haystack.__dict__:
for inner_result in search_type(haystack.inner, needle):
result.append(inner_result)
return result
if __name__ == "__main__":
type_name = sys.argv[1]
print(f"searching for {type_name}")
rrc_asn = load_asn()
terminating_types = get_terminating_types(rrc_asn)
needle = rrc_asn.types.get(type_name)
if needle == None:
raise ValueError(f"couldn't find type {type}")
for haystack in terminating_types:
for result in search_type(haystack.type, type_name):
print(result + '\n')
+4
View File
@@ -0,0 +1,4 @@
asn1tools==0.166.0
bitstruct==8.19.0
diskcache==5.6.3
pyparsing==3.1.2