--- a/src/SFtp.cc +++ a/src/SFtp.cc @@ -299,12 +299,7 @@ void SFtp::MoveConnectionHere(SFtp *o) recv_translate=o->recv_translate.borrow(); send_translate=o->send_translate.borrow(); rate_limit=o->rate_limit.borrow(); - expect_queue_size=o->expect_queue_size; o->expect_queue_size=0; - expect_chain=o->expect_chain; o->expect_chain=0; - expect_chain_end=o->expect_chain_end; - if(expect_chain_end==&o->expect_chain) - expect_chain_end=&expect_chain; - o->expect_chain_end=&o->expect_chain; + expect_queue.move_here(o->expect_queue); timeout_timer.Reset(o->timeout_timer); ssh_id=o->ssh_id; state=CONNECTED; @@ -337,10 +332,6 @@ void SFtp::Init() eof=false; received_greeting=false; password_sent=0; - expect_queue_size=0; - expect_chain=0; - expect_chain_end=&expect_chain; - ooo_chain=0; protocol_version=0; send_translate=0; recv_translate=0; @@ -693,8 +684,7 @@ void SFtp::Close() CloseHandle(Expect::IGNORE); super::Close(); // don't need these out-of-order packets anymore - while(ooo_chain) - DeleteExpect(&ooo_chain); + ooo_chain.truncate(); if(recv_buf) recv_buf->Resume(); } @@ -796,7 +786,7 @@ void SFtp::HandleExpect(Expect *e) SetError(NO_FILE,strerror(ENOTDIR)); break; } - if(mode==CHANGE_DIR && RespQueueIsEmpty()) + if(mode==CHANGE_DIR && GetExpectCount(Expect::CWD)==0) { cwd.Set(file); eof=true; @@ -869,11 +859,15 @@ void SFtp::HandleExpect(Expect *e) } else { - if(e->next!=ooo_chain) - LogNote(9,"put a packet with id=%d on out-of-order chain (need_pos=%lld packet_pos=%lld)", - reply->GetID(),(long long)(pos+file_buf->Size()),(long long)r->pos); - e->next=ooo_chain; - ooo_chain=e; + LogNote(9,"put a packet with id=%d on out-of-order chain (need_pos=%lld packet_pos=%lld)", + reply->GetID(),(long long)(pos+file_buf->Size()),(long long)r->pos); + if(ooo_chain.count()>=64) + { + LogError(0,"Too many out-of-order packets"); + Disconnect(); + return; + } + ooo_chain.append(e); return; } } @@ -925,7 +919,7 @@ void SFtp::HandleExpect(Expect *e) LogNote(9,"eof"); eof=true; state=DONE; - if(file_buf && !ooo_chain) + if(file_buf && ooo_chain.count()==0) file_buf->PutEOF(); break; } @@ -1006,22 +1000,20 @@ int SFtp::HandleReplies() if(!recv_buf) return MOVED; - int i=0; - Expect *ooo_scan=ooo_chain; - while(ooo_scan) - { - Expect *next=ooo_scan->next; - ooo_chain=next; - HandleExpect(ooo_scan); - ooo_scan=next; - if(++i>64) - { - LogError(0,"Too many out-of-order packets"); - Disconnect(); - return MOVED; + if(file_buf) { + off_t need_pos=pos+file_buf->Size(); + // there are usually a few of out-of-order packets, no need for fast search + for(int i=0; ihas_data_at_pos(need_pos)) { + Expect *e=ooo_chain[i]; + ooo_chain[i]=0; // to keep the Expect + ooo_chain.remove(i); + HandleExpect(e); + } } } - if(!ooo_chain && eof && file_buf && !file_buf->Eof()) + + if(ooo_chain.count()==0 && eof && file_buf && !file_buf->Eof()) file_buf->PutEOF(); if(recv_buf->Size()<4) @@ -1066,51 +1058,20 @@ int SFtp::HandleReplies() HandleExpect(e); return MOVED; } -SFtp::Expect **SFtp::FindExpect(Packet *p) -{ - unsigned id=p->GetID(); - for(Expect **scan=&expect_chain; *scan; scan=&scan[0]->next) - { - if(scan[0]->request->GetID()==id) - { - assert(!scan[0]->reply); - scan[0]->reply=p; - return scan; - } - } - return 0; -} void SFtp::PushExpect(Expect *e) { - e->next=*expect_chain_end; - *expect_chain_end=e; - expect_chain_end=&e->next; - expect_queue_size++; -} -void SFtp::DeleteExpect(Expect **e) -{ - if(expect_chain_end==&e[0]->next) - expect_chain_end=e; - Expect *d=*e; - *e=e[0]->next; - delete d; - expect_queue_size--; + expect_queue.add(e->request->GetKey(),e); } SFtp::Expect *SFtp::FindExpectExclusive(Packet *p) { - Expect **e=FindExpect(p); - if(!e || !*e) - return 0; - Expect *res=*e; - if(expect_chain_end==&res->next) - expect_chain_end=e; - *e=res->next; - expect_queue_size--; - return res; + Expect *e=expect_queue.borrow(p->GetKey()); + if(e) + e->reply=p; + return e; } void SFtp::CloseExpectQueue() { - for(Expect *e=expect_chain; e; e=e->next) + for(Expect *e=expect_queue.each_begin(); e; e=expect_queue.each_next()) { switch(e->tag) { @@ -1133,6 +1094,14 @@ void SFtp::CloseExpectQueue() } } +int SFtp::GetExpectCount(Expect::expect_t tag) +{ + int count=0; + for(Expect *e=expect_queue.each_begin(); e; e=expect_queue.each_next()) + count+=(e->tag==tag); + return count; +} + Glob *SFtp::MakeGlob(const char *pat) { return new GenericGlob(this,pat); --- a/src/SFtp.h +++ a/src/SFtp.h @@ -28,6 +28,7 @@ #include #include #include "FileSet.h" +#include "xmap.h" class SFtp : public SSH_Access { @@ -291,6 +292,7 @@ public: packet_type GetPacketType() { return type; } const char *GetPacketTypeText(); unsigned GetID() const { return id; } + const xstring& GetKey() { return xstring::get_tmp((const char*)&id,sizeof(id)); } void SetID(unsigned new_id) { id=new_id; } void DropData(Buffer *b) { b->Skip(4+(length>0?length:0)); } bool TypeIs(packet_type t) const { return type==t; } @@ -674,10 +676,15 @@ private: Ref request; Ref reply; - Expect *next; int i; expect_t tag; Expect(Packet *req,expect_t t,int j=0) : request(req), i(j), tag(t) {} + + bool has_data_at_pos(off_t pos) const { + if(!reply->TypeIs(SSH_FXP_DATA) || !request->TypeIs(SSH_FXP_READ)) + return false; + return request.Cast()->pos==pos; + } }; void PushExpect(Expect *); @@ -685,26 +692,22 @@ private: int HandlePty(); void HandleExpect(Expect *); void CloseExpectQueue(); + int GetExpectCount(Expect::expect_t tag); void CloseHandle(Expect::expect_t e); int ReplyLogPriority(int); - int expect_queue_size; - Expect *expect_chain; - Expect **expect_chain_end; - Expect **FindExpect(Packet *reply); - void DeleteExpect(Expect **); - Expect *FindExpectExclusive(Packet *reply); - Expect *ooo_chain; // out of order replies buffered + xmap_p expect_queue; + const xstring& expect_key(unsigned id); - int RespQueueIsEmpty() { return expect_chain==0; } - int RespQueueSize() { return expect_queue_size; } - void EmptyRespQueue() - { - while(expect_chain) - DeleteExpect(&expect_chain); - while(ooo_chain) - DeleteExpect(&ooo_chain); - } + Expect *FindExpectExclusive(Packet *reply); + xarray_p ooo_chain; // out of order replies buffered + + int RespQueueSize() const { return expect_queue.count(); } + int RespQueueIsEmpty() const { return RespQueueSize()==0; } + void EmptyRespQueue() { + expect_queue.empty(); + ooo_chain.truncate(); + } bool GetBetterConnection(int level,bool limit_reached); void MoveConnectionHere(SFtp *o); --- a/src/xmap.cc +++ a/src/xmap.cc @@ -133,8 +137,9 @@ void _xmap::_remove(entry **ep) if(!ep || !*ep) return; entry *e=*ep; + e->key.unset(); *ep=e->next; - delete e; + xfree(e); entry_count--; } --- a/src/xmap.h +++ b/src/xmap.h @@ -103,6 +103,7 @@ template T xmap::zero; template class xmap_p : public _xmap { +void dispose(T *p) { delete p; } public: xmap_p() : _xmap(sizeof(T*)) {} ~xmap_p() { @@ -136,7 +137,7 @@ public: } void add(const xstring& key,T *e0) { entry *e=_add(key); - delete(payload(e)); + dispose(payload(e)); payload_Lv(e)=e0; } void add(const char *key,T *e0) { add(xstring::get_tmp(key),e0); } @@ -148,7 +148,7 @@ public: void empty() { for(int i=0; i