Fast coarse AMG comms: rewrite with maps

This commit is contained in:
Hrvoje Jasak 2016-05-31 19:26:15 +01:00
parent bccb28bf05
commit 262b02c5fb
6 changed files with 172 additions and 454 deletions

View file

@ -44,42 +44,153 @@ namespace Foam
void Foam::ggiAMGInterface::initFastReduce() const
{
if (!Pstream::parRun())
{
FatalErrorIn("void ggiAMGInterface::initFastReduce() const")
<< "Requested calculation of send-receive addressing for a "
<< "serial run. This is not allowed"
<< abort(FatalError);
}
// Init should be executed only once
initReduce_ = true;
// Note: this is different from ggiPolyPatch comms because zone
// is the same on master the slave side.
// HJ, 31/May/2016
// Establish parallel comms pattern
if (!localParallel() && Pstream::parRun())
// Get addressing
const labelList& za = zoneAddressing();
const labelList& shadowZa = shadowInterface().zoneAddressing();
// Make a zone-sized field and fill it in with proc markings for processor
// that holds and requires the data
labelField zoneProcID(zoneSize(), -1);
forAll (za, zaI)
{
// Only master handles communication
if (Pstream::master())
zoneProcID[za[zaI]] = Pstream::myProcNo();
}
reduce(zoneProcID, maxOp<labelField>());
// Find out where my zone data is coming from
labelList nRecv(Pstream::nProcs(), 0);
forAll (shadowZa, shadowZaI)
{
nRecv[zoneProcID[shadowZa[shadowZaI]]]++;
}
// Make a receiving sub-map
// It tells me which data I will receive from which processor and
// where I need to put it into the remoteZone data before the mapping
labelListList constructMap(Pstream::nProcs());
// Size the receiving list
forAll (nRecv, procI)
{
constructMap[procI].setSize(nRecv[procI]);
}
// Reset counters for processors
nRecv = 0;
forAll (shadowZa, shadowZaI)
{
label recvProc = zoneProcID[shadowZa[shadowZaI]];
constructMap[recvProc][nRecv[recvProc]] = shadowZa[shadowZaI];
nRecv[recvProc]++;
}
// Make the sending sub-map
// It tells me which data is required from me to be sent to which
// processor
// Algorithm
// - expand the local zone faces with indices into a size of local zone
// - go through remote zone addressing on all processors
// - find out who hits my faces
labelList localZoneIndices(zoneSize(), -1);
forAll (za, zaI)
{
localZoneIndices[za[zaI]] = zaI;
}
labelListList shadowToReceiveAddr(Pstream::nProcs());
// Get the list of what my shadow needs to receive from my zone
// on all other processors
shadowToReceiveAddr[Pstream::myProcNo()] = shadowZa;
Pstream::gatherList(shadowToReceiveAddr);
Pstream::scatterList(shadowToReceiveAddr);
// Now local zone indices contain the index of a local face that will
// provide the data. For faces that are not local, the index will be -1
// Find out where my zone data is going to
// Make a sending sub-map
// It tells me which data I will send to which processor
labelListList sendMap(Pstream::nProcs());
// Collect local labels to be sent to each processor
forAll (shadowToReceiveAddr, procI)
{
const labelList& curProcSend = shadowToReceiveAddr[procI];
// Find out how much of my data is going to this processor
label nProcSend = 0;
forAll (curProcSend, sendI)
{
receiveAddr_.setSize(Pstream::nProcs());
sendAddr_.setSize(Pstream::nProcs());
receiveAddr_[0] = zoneAddressing();
for (label procI = 1; procI < Pstream::nProcs(); procI++)
if (localZoneIndices[curProcSend[sendI]] > -1)
{
// Note: must use normal comms because the size of the
// communicated lists is unknown on the receiving side
// HJ, 4/Jun/2011
// Opt: reconsider mode of communication
IPstream ip(Pstream::scheduled, procI);
receiveAddr_[procI] = labelList(ip);
sendAddr_[procI] = labelList(ip);
nProcSend++;
}
}
else
{
// Opt: reconsider mode of communication
OPstream op(Pstream::scheduled, Pstream::masterNo());
op << zoneAddressing() << shadowInterface().zoneAddressing();
if (nProcSend > 0)
{
// Collect the indices
labelList& curSendMap = sendMap[procI];
curSendMap.setSize(nProcSend);
// Reset counter
nProcSend = 0;
forAll (curProcSend, sendI)
{
if (localZoneIndices[curProcSend[sendI]] > -1)
{
curSendMap[nProcSend] =
localZoneIndices[curProcSend[sendI]];
nProcSend++;
}
}
}
}
// Map will return the object of the size of remote zone
// HJ, 9/May/2016
mapPtr_ = new mapDistribute(zoneSize(), sendMap, constructMap);
}
const Foam::mapDistribute& Foam::ggiAMGInterface::map() const
{
if (!mapPtr_)
{
initFastReduce();
}
return *mapPtr_;
}
@ -98,8 +209,7 @@ Foam::ggiAMGInterface::ggiAMGInterface
zoneSize_(0),
zoneAddressing_(),
initReduce_(false),
receiveAddr_(),
sendAddr_()
mapPtr_(NULL)
{
// Note.
// All processors will do the same coarsening and then filter
@ -598,7 +708,9 @@ Foam::ggiAMGInterface::ggiAMGInterface
// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
Foam::ggiAMGInterface::~ggiAMGInterface()
{}
{
deleteDemandDrivenData(mapPtr_);
}
// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //

View file

@ -40,6 +40,7 @@ SourceFiles
#include "AMGInterface.H"
#include "ggiLduInterface.H"
#include "mapDistribute.H"
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
@ -73,11 +74,8 @@ class ggiAMGInterface
//- Has reduce been initialised?
mutable bool initReduce_;
//- List of zone faces indices received from each processor
mutable labelListList receiveAddr_;
//- List of zone faces indices to send to each processor
mutable labelListList sendAddr_;
//- Map-distribute comms tool
mutable mapDistribute* mapPtr_;
// Private Member Functions
@ -88,9 +86,13 @@ class ggiAMGInterface
//- Disallow default bitwise assignment
void operator=(const ggiAMGInterface&);
//- Init fast reduce
void initFastReduce() const;
//- Return mapDistribute
const mapDistribute& map() const;
// Private static data

View file

@ -64,138 +64,37 @@ tmp<Field<Type> > ggiAMGInterface::fastReduce(const UList<Type>& ff) const
return tresult;
}
// Execute reduce if not already done
if (!initReduce_)
{
initFastReduce();
}
if (Pstream::master())
{
// Master collects information and distributes data.
Field<Type> expandField(zoneSize(), pTraits<Type>::zero);
// Insert master processor
const labelList& za = zoneAddressing();
forAll (za, i)
{
expandField[za[i]] = ff[i];
}
// Master receives and inserts data from all processors for which
// receiveAddr contains entries
for (label procI = 1; procI < Pstream::nProcs(); procI++)
{
const labelList& curRAddr = receiveAddr_[procI];
if (!curRAddr.empty())
{
Field<Type> receiveBuf(curRAddr.size());
// Opt: reconsider mode of communication
IPstream::read
(
Pstream::blocking,
procI,
reinterpret_cast<char*>(receiveBuf.begin()),
receiveBuf.byteSize()
);
// Insert received information
forAll (curRAddr, i)
{
expandField[curRAddr[i]] = receiveBuf[i];
}
}
}
// Expanded field complete, send required data to other processors
for (label procI = 1; procI < Pstream::nProcs(); procI++)
{
const labelList& curSAddr = sendAddr_[procI];
if (!curSAddr.empty())
{
Field<Type> sendBuf(curSAddr.size());
forAll (curSAddr, i)
{
sendBuf[i] = expandField[curSAddr[i]];
}
// Opt: reconsider mode of communication
OPstream::write
(
Pstream::blocking,
procI,
reinterpret_cast<const char*>(sendBuf.begin()),
sendBuf.byteSize()
);
}
}
// Note: different from ggi patch: field reduction happens within
// fastReduce. HJ, 26/Jun/2011
const labelList& sza = shadowInterface().zoneAddressing();
tmp<Field<Type> > tredField
(
new Field<Type>(sza.size(), pTraits<Type>::zero)
);
Field<Type>& redField = tredField();
// Select elements from shadow zone addressing
forAll (sza, i)
{
redField[i] = expandField[sza[i]];
}
return tredField;
}
else
{
// Send local data to master and receive remote data
// If patch is empty, communication is avoided
// HJ, 4/Jun/2011
if (size())
// Optimised mapDistribute
// Execute init reduce to calculate addressing if not already done
if (!initReduce_)
{
// Opt: reconsider mode of communication
OPstream::write
(
Pstream::blocking,
Pstream::masterNo(),
reinterpret_cast<const char*>(ff.begin()),
ff.byteSize()
);
initFastReduce();
}
// Prepare to receive remote data
const labelList& sza = shadowInterface().zoneAddressing();
// Prepare for distribute: field will be expanded to zone size
List<Type> expand = ff;
tmp<Field<Type> > treceiveBuf
map().distribute(expand);
const labelList& shadowZa = shadowInterface().zoneAddressing();
// Prepare return field: zone size
tmp<Field<Type> > tresult
(
new Field<Type>(sza.size(), pTraits<Type>::zero)
new Field<Type>(shadowZa.size())
);
Field<Type>& receiveBuf = treceiveBuf();
Field<Type>& result = tresult();
if (!sza.empty())
// Filter from expanded field to zone size
forAll (shadowZa, shadowZaI)
{
// Opt: reconsider mode of communication
IPstream::read
(
Pstream::blocking,
Pstream::masterNo(),
reinterpret_cast<char*>(receiveBuf.begin()),
receiveBuf.byteSize()
);
// Note: different from ggi patch: field reduction happens within
// fastReduce. HJ, 26/Jun/2011
result[shadowZaI] = expand[shadowZa[shadowZaI]];
}
return treceiveBuf;
return tresult;
}
}

View file

@ -362,7 +362,7 @@ void Foam::ggiPolyPatch::calcSendReceive() const
// (of the calc-call) they will be set to zero-sized array
// HJ, 4/Jun/2011
if (receiveAddrPtr_ || sendAddrPtr_ || mapPtr_)
if (mapPtr_)
{
FatalErrorIn("void ggiPolyPatch::calcSendReceive() const")
<< "Send-receive addressing already calculated"
@ -388,29 +388,6 @@ void Foam::ggiPolyPatch::calcSendReceive() const
// Get patch-to-zone addressing
const labelList& za = zoneAddressing();
// Get remote patch-to-zone addressing
const labelList& rza = remoteZoneAddressing();
receiveAddrPtr_ = new labelListList(Pstream::nProcs());
labelListList& rAddr = *receiveAddrPtr_;
sendAddrPtr_ = new labelListList(Pstream::nProcs());
labelListList& sAddr = *sendAddrPtr_;
rAddr[Pstream::myProcNo()] = za;
sAddr[Pstream::myProcNo()] = rza;
// Perform gather operation to master
Pstream::gatherList(rAddr);
Pstream::gatherList(sAddr);
// Perform scatter operation to master
Pstream::scatterList(rAddr);
Pstream::scatterList(sAddr);
// Zone addressing and remote zone addressing is fixed. Determine the
// parallel communications pattern
// Make a zone-sized field and fill it in with proc markings for processor
// that holds and requires the data
labelField zoneProcID(zone().size(), -1);
@ -457,7 +434,7 @@ void Foam::ggiPolyPatch::calcSendReceive() const
}
// Make the sending sub-map
// It tells me which data is required from me to be send to which
// It tells me which data is required from me to be sent to which
// processor
// Algorithm
@ -532,28 +509,6 @@ void Foam::ggiPolyPatch::calcSendReceive() const
}
const Foam::labelListList& Foam::ggiPolyPatch::receiveAddr() const
{
if (!receiveAddrPtr_)
{
calcSendReceive();
}
return *receiveAddrPtr_;
}
const Foam::labelListList& Foam::ggiPolyPatch::sendAddr() const
{
if (!sendAddrPtr_)
{
calcSendReceive();
}
return *sendAddrPtr_;
}
const Foam::mapDistribute& Foam::ggiPolyPatch::map() const
{
if (!mapPtr_)
@ -575,8 +530,6 @@ void Foam::ggiPolyPatch::clearGeom() const
// HJ, 23/Jun/2011
deleteDemandDrivenData(remoteZoneAddressingPtr_);
deleteDemandDrivenData(receiveAddrPtr_);
deleteDemandDrivenData(sendAddrPtr_);
deleteDemandDrivenData(mapPtr_);
}
@ -617,8 +570,6 @@ Foam::ggiPolyPatch::ggiPolyPatch
remoteZoneAddressingPtr_(NULL),
reconFaceCellCentresPtr_(NULL),
localParallelPtr_(NULL),
receiveAddrPtr_(NULL),
sendAddrPtr_(NULL),
mapPtr_(NULL)
{}
@ -648,8 +599,6 @@ Foam::ggiPolyPatch::ggiPolyPatch
remoteZoneAddressingPtr_(NULL),
reconFaceCellCentresPtr_(NULL),
localParallelPtr_(NULL),
receiveAddrPtr_(NULL),
sendAddrPtr_(NULL),
mapPtr_(NULL)
{}
@ -674,8 +623,6 @@ Foam::ggiPolyPatch::ggiPolyPatch
remoteZoneAddressingPtr_(NULL),
reconFaceCellCentresPtr_(NULL),
localParallelPtr_(NULL),
receiveAddrPtr_(NULL),
sendAddrPtr_(NULL),
mapPtr_(NULL)
{
if (dict.found("quickReject"))
@ -706,8 +653,6 @@ Foam::ggiPolyPatch::ggiPolyPatch
remoteZoneAddressingPtr_(NULL),
reconFaceCellCentresPtr_(NULL),
localParallelPtr_(NULL),
receiveAddrPtr_(NULL),
sendAddrPtr_(NULL),
mapPtr_(NULL)
{}
@ -734,8 +679,6 @@ Foam::ggiPolyPatch::ggiPolyPatch
remoteZoneAddressingPtr_(NULL),
reconFaceCellCentresPtr_(NULL),
localParallelPtr_(NULL),
receiveAddrPtr_(NULL),
sendAddrPtr_(NULL),
mapPtr_(NULL)
{}
@ -915,7 +858,7 @@ void Foam::ggiPolyPatch::initAddressing()
if (Pstream::parRun() && !localParallel())
{
// Calculate send addressing
sendAddr();
map();
}
}
@ -990,7 +933,7 @@ void Foam::ggiPolyPatch::initMovePoints(const pointField& p)
if (Pstream::parRun() && !localParallel())
{
sendAddr();
map();
}
}

View file

@ -105,12 +105,6 @@ class ggiPolyPatch
// Used for parallel optimisation
mutable bool* localParallelPtr_;
//- List of zone faces indices received from each processor
mutable labelListList* receiveAddrPtr_;
//- List of zone faces indices to send to each processor
mutable labelListList* sendAddrPtr_;
//- Map-distribute comms tool
mutable mapDistribute* mapPtr_;
@ -144,13 +138,6 @@ class ggiPolyPatch
//- Calculate send and receive addressing
void calcSendReceive() const;
//- Return receive addressing
const labelListList& receiveAddr() const;
//- Return send addressing
const labelListList& sendAddr() const;
//- Return mapDistribute
const mapDistribute& map() const;

View file

@ -64,20 +64,7 @@ Foam::tmp<Foam::Field<Type> > Foam::ggiPolyPatch::fastExpand
<< abort(FatalError);
}
// Algorithm:
// 1) Master processor holds maps of all zone addressing (data provided)
// and all remote zone addressing (data required)
// 2) Each processor will send the locally active data to the master
// 3) Master assembles all the data
// 4) Master sends to all processors the data they need to receive
//
// Notes:
// A) If the size of zone addressing is zero, data is not sent
// B) Communicated data on each processor has the size of live faces
// C) Expanded data will be equal to actual data from other processors
// only for the faces marked in remote; for other faces, it will be
// equal to zero
// D) On processor zero, complete data is available
// Replaced old comms algorithm. HJ, 31/May/2016
// HJ, 4/Jun/2011
// HR, 10/Jul/2013
@ -85,12 +72,10 @@ Foam::tmp<Foam::Field<Type> > Foam::ggiPolyPatch::fastExpand
// symmetric across processors. Hence trigger re-calculate at this point
if (Pstream::parRun() && !localParallel())
{
receiveAddr();
shadow().receiveAddr();
map();
shadow().map();
}
#if 1
// New version: mapDistribute
if (Pstream::parRun())
@ -127,216 +112,6 @@ Foam::tmp<Foam::Field<Type> > Foam::ggiPolyPatch::fastExpand
return texpandField;
}
#else
// Variant 2: global gather-scatter. Original version
// Note: expandField is filled with nans in unused part
// HJ, 25/May/2016
tmp<Field<Type> > texpandField
(
new Field<Type>(zone().size())
);
Field<Type>& expandField = texpandField();
if (Pstream::master())
{
// Insert master processor
const labelList& za = zoneAddressing();
forAll (za, i)
{
expandField[za[i]] = ff[i];
}
// Master receives and inserts data from all processors for which
// receiveAddr contains entries
for (label procI = 1; procI < Pstream::nProcs(); procI++)
{
const labelList& curRAddr = receiveAddr()[procI];
if (!curRAddr.empty())
{
Field<Type> receiveBuf(curRAddr.size());
// Opt: reconsider mode of communication
IPstream::read
(
Pstream::blocking,
procI,
reinterpret_cast<char*>(receiveBuf.begin()),
receiveBuf.byteSize()
);
// Insert received information
forAll (curRAddr, i)
{
expandField[curRAddr[i]] = receiveBuf[i];
}
}
}
// Expanded field complete, send required data to other processors
for (label procI = 1; procI < Pstream::nProcs(); procI++)
{
const labelList& curSAddr = shadow().sendAddr()[procI];
if (!curSAddr.empty())
{
Field<Type> sendBuf(curSAddr.size());
forAll (curSAddr, i)
{
sendBuf[i] = expandField[curSAddr[i]];
}
// Opt: reconsider mode of communication
OPstream::write
(
Pstream::blocking,
procI,
reinterpret_cast<const char*>(sendBuf.begin()),
sendBuf.byteSize()
);
}
}
}
else
{
// Send local data to master and receive remote data
// If patch is empty, communication is avoided
// HJ, 4/Jun/2011
if (size())
{
// Opt: reconsider mode of communication
OPstream::write
(
Pstream::blocking,
Pstream::masterNo(),
reinterpret_cast<const char*>(ff.begin()),
ff.byteSize()
);
}
// Prepare to receive remote data
const labelList& rza = shadow().remoteZoneAddressing();
if (!rza.empty())
{
Field<Type> receiveBuf(rza.size());
// Opt: reconsider mode of communication
IPstream::read
(
Pstream::blocking,
Pstream::masterNo(),
reinterpret_cast<char*>(receiveBuf.begin()),
receiveBuf.byteSize()
);
// Insert the data into expanded field
forAll (rza, i)
{
expandField[rza[i]] = receiveBuf[i];
}
}
}
return texpandField;
// #else
// // Variant 3: unpacked mapDistribute
// if (Pstream::parRun())
// {
// // Send subField to other processors
// for (label domainI = 0; domainI < Pstream::nProcs(); domainI++)
// {
// const labelList& curMap = map().subMap()[domainI];
// if (domainI != Pstream::myProcNo() && curMap.size())
// {
// Pout<< "Sending " << curMap.size()
// << " from " << Pstream::myProcNo()
// << " to " << domainI
// << endl;
// OPstream toNbr(Pstream::blocking, domainI);
// toNbr << List<Type>(UIndirectList<Type>(ff, curMap));
// }
// }
// // Subset myself
// {
// const labelList& mySubMap = map().subMap()[Pstream::myProcNo()];
// List<Type> subField(mySubMap.size());
// forAll (mySubMap, i)
// {
// subField[i] = ff[mySubMap[i]];
// }
// // Receive sub field from myself (subField)
// const labelList& curMap =
// map().constructMap()[Pstream::myProcNo()];
// forAll (curMap, i)
// {
// expandField[curMap[i]] = subField[i];
// }
// }
// // Receive sub field from neighbour
// for (label domainI = 0; domainI < Pstream::nProcs(); domainI++)
// {
// const labelList& curMap = map().constructMap()[domainI];
// if (domainI != Pstream::myProcNo() && curMap.size())
// {
// IPstream fromNbr(Pstream::blocking, domainI);
// List<Type> recvField(fromNbr);
// Pout<< "Receiving " << recvField.size()
// << " (" << curMap.size()
// << ") from " << domainI
// << " to " << Pstream::myProcNo()
// << endl;
// if (curMap.size() != recvField.size())
// {
// FatalErrorIn
// (
// "tmp<Field<Type> > ggiPolyPatch::fastExpand\n"
// "(\n"
// " const Field<Type>& ff\n"
// ") const"
// ) << "Expected from processor " << domainI << " size "
// << curMap.size() << " but received "
// << recvField.size() << " elements."
// << abort(FatalError);
// }
// forAll (curMap, i)
// {
// expandField[curMap[i]] = recvField[i];
// }
// }
// }
// return texpandField;
// }
// else
// {
// // Serial. Expand the field to zone size
// const labelList& zAddr = zoneAddressing();
// forAll (zAddr, i)
// {
// expandField[zAddr[i]] = ff[i];
// }
// }
#endif
}