Files
dpi/protocol/detect_ssh_test.go
2025-10-09 15:37:17 +02:00

154 lines
3.9 KiB
Go

package protocol
import (
"testing"
)
func TestDetectSSH(t *testing.T) {
atomicFormats.Store([]format{{Both, "", detectSSH}})
// 1. A standard OpenSSH banner
openSSHBanner := []byte("SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4\r\n")
// 2. An SSH banner with a pre-banner legal notice
preBannerSSH := []byte(
"*******************************************************************\r\n" +
"* W A R N I N G *\r\n" +
"* This system is for the use of authorized users only. *\r\n" +
"*******************************************************************\r\n" +
"SSH-2.0-OpenSSH_7.6p1\r\n",
)
// 3. A different SSH implementation (Dropbear)
dropbearBanner := []byte("SSH-2.0-dropbear_2020.81\r\n")
// 4. An invalid banner (e.g., the MySQL banner from the previous example)
mysqlBanner := []byte{
0x0a, 0x38, 0x2e, 0x30, 0x2e, 0x33, 0x32, 0x00, 0x0d, 0x00, 0x00, 0x00,
}
// 5. A simple HTTP request
httpBanner := []byte("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
testRunner(t, []*testCase{
{
Name: "OpenSSH client",
Direction: Client,
Data: openSSHBanner,
DstPort: 22,
WantProto: ProtocolSSH,
WantConfidence: .95,
},
{
Name: "OpenSSH server",
Direction: Server,
Data: openSSHBanner,
SrcPort: 22,
WantProto: ProtocolSSH,
WantConfidence: .95,
},
{
Name: "OpenSSH server with banner",
Direction: Server,
Data: preBannerSSH,
SrcPort: 22,
WantProto: ProtocolSSH,
WantConfidence: .95,
},
{
Name: "Dropbear server",
Direction: Server,
Data: dropbearBanner,
SrcPort: 22,
WantProto: ProtocolSSH,
WantConfidence: .95,
},
{
Name: "Invalid MySQL",
Direction: Server,
Data: mysqlBanner,
SrcPort: 3306,
WantError: ErrUnknown,
},
{
Name: "Invalid HTTP",
Direction: Client,
Data: httpBanner,
DstPort: 80,
WantError: ErrUnknown,
},
})
/*
t.Run("OpenSSH client", func(t *testing.T) {
p, c, err := Detect(Server, openSSHBanner, 0, 22)
if err != nil {
t.Fatal(err)
return
}
t.Logf("detected %s version %s confidence %g%%", p.Name, p.Version, c*100)
if p.Name != ProtocolSSH {
t.Fatalf("expected ssh protocol, got %s", p.Name)
return
}
})
t.Run("OpenSSH server", func(t *testing.T) {
p, c, err := Detect(Server, openSSHBanner, 0, 22)
if err != nil {
t.Fatal(err)
return
}
t.Logf("detected %s version %s confidence %g%%", p.Name, p.Version, c*100)
if p.Name != ProtocolSSH {
t.Fatalf("expected ssh protocol, got %s", p.Name)
return
}
})
t.Run("OpenSSH server with banner", func(t *testing.T) {
p, c, err := Detect(Server, preBannerSSH, 0, 22)
if err != nil {
t.Fatal(err)
return
}
t.Logf("detected %s version %s confidence %g%%", p.Name, p.Version, c*100)
if p.Name != ProtocolSSH {
t.Fatalf("expected ssh protocol, got %s", p.Name)
return
}
})
t.Run("Dropbear server", func(t *testing.T) {
p, c, err := Detect(Server, dropbearBanner, 0, 22)
if err != nil {
t.Fatal(err)
return
}
t.Logf("detected %s version %s confidence %g%%", p.Name, p.Version, c*100)
if p.Name != ProtocolSSH {
t.Fatalf("expected ssh protocol, got %s", p.Name)
return
}
})
t.Run("Invalid MySQL banner", func(t *testing.T) {
_, _, err := Detect(Server, mysqlBanner, 0, 3306)
if !errors.Is(err, ErrUnknown) {
t.Fatalf("expected unknown format, got error %T: %q", err, err)
} else {
t.Logf("error %q, as expected", err)
}
})
t.Run("Invalid HTTP banner", func(t *testing.T) {
_, _, err := Detect(Server, httpBanner, 0, 80)
if !errors.Is(err, ErrUnknown) {
t.Fatalf("expected unknown format, got error %T: %q", err, err)
} else {
t.Logf("error %q, as expected", err)
}
})
*/
}